-
Notifications
You must be signed in to change notification settings - Fork 37
/
s3_reader.go
105 lines (84 loc) · 2.28 KB
/
s3_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package goreplay
import (
"bytes"
"log"
"os"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// S3ReadCloser ...
type S3ReadCloser struct {
bucket string
key string
offset int
totalSize int
readBytes int
sess *session.Session
buf *bytes.Buffer
}
func awsConfig() *aws.Config {
region := os.Getenv("AWS_DEFAULT_REGION")
if region == "" {
region = os.Getenv("AWS_REGION")
if region == "" {
region = "us-east-1"
}
}
config := &aws.Config{Region: aws.String(region)}
if endpoint := os.Getenv("AWS_ENDPOINT_URL"); endpoint != "" {
config.Endpoint = aws.String(endpoint)
log.Println("Custom endpoint:", endpoint)
}
log.Println("Connecting to S3. Region: " + region)
config.CredentialsChainVerboseErrors = aws.Bool(true)
if os.Getenv("AWS_DEBUG") != "" {
config.LogLevel = aws.LogLevel(aws.LogDebugWithHTTPBody)
}
return config
}
// NewS3ReadCloser returns new instance of S3 read closer
func NewS3ReadCloser(path string) *S3ReadCloser {
if !PRO {
log.Fatal("Using S3 input and output require PRO license")
return nil
}
bucket, key := parseS3Url(path)
sess := session.Must(session.NewSession(awsConfig()))
log.Println("[S3 Input] S3 connection successfully initialized", path)
return &S3ReadCloser{
bucket: bucket,
key: key,
sess: sess,
buf: &bytes.Buffer{},
}
}
// Read reads buffer from s3 session
func (s *S3ReadCloser) Read(b []byte) (n int, e error) {
if s.readBytes == 0 || s.readBytes+len(b) > s.offset {
svc := s3.New(s.sess)
objectRange := "bytes=" + strconv.Itoa(s.offset)
s.offset += 1000000 // Reading in chunks of 1 mb
objectRange += "-" + strconv.Itoa(s.offset-1)
params := &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
Range: aws.String(objectRange),
}
resp, err := svc.GetObject(params)
if err != nil {
log.Println("[S3 Input] Error during getting file", s.bucket, s.key, err)
} else {
s.totalSize, _ = strconv.Atoi(strings.Split(*resp.ContentRange, "/")[1])
s.buf.ReadFrom(resp.Body)
}
}
s.readBytes += len(b)
return s.buf.Read(b)
}
// Close is here to make S3ReadCloser satisfy ReadCloser interface
func (s *S3ReadCloser) Close() error {
return nil
}