Skip to content

Commit

Permalink
feat: Update main.go to load S3 file from a different endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Aug 29, 2024
1 parent 2cedeb7 commit 3d48b32
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 157 deletions.
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# yaml-language-server: $schema=https://goreleaser.com/static/schema.json
# vim: set ts=2 sw=2 tw=0 fo=cnqoj

version: 1
version: 2

before:
hooks:
Expand Down
7 changes: 7 additions & 0 deletions examples/load-s3-store-s3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Usage

Use the following path as the input filepath.

```
s3://default/data.json?region=us-west-2&endpoint=s3.amazonaws.com&access_key=********************&secret_key=****************************************
```
34 changes: 34 additions & 0 deletions examples/load-s3-store-s3/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"log/slog"

"github.com/WangYihang/gojob/pkg/utils"
"github.com/WangYihang/uio"
)

func main() {
resultFd, err := uio.Open("s3://uio/output.txt" +
"?endpoint=127.0.0.1:9000" +
"&access_key=minioadmin" +
"&secret_key=minioadmin" +
"&insecure=true" +
"&mode=write",
)
if err != nil {
slog.Error("failed to open result file", slog.String("error", err.Error()))
return
}
defer resultFd.Close()
for line := range utils.Cat(
"s3://uio/input.txt" +
"?endpoint=127.0.0.1:9000" +
"&access_key=minioadmin" +
"&secret_key=minioadmin" +
"&insecure=true" +
"&mode=read",
) {
slog.Info("s3", slog.String("line", line))
resultFd.Write([]byte(line + "\n"))
}
}
23 changes: 12 additions & 11 deletions examples/stdio/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package main

import (
"math/rand"
"time"
"fmt"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/utils"
)

type MyTask struct{}
type MyPrinterTask struct {
line string
}

func New() *MyTask {
return &MyTask{}
func New(line string) *MyPrinterTask {
return &MyPrinterTask{
line: line,
}
}

func (t *MyTask) Do() error {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
func (t *MyPrinterTask) Do() error {
fmt.Println(t.line)
return nil
}

Expand All @@ -24,14 +27,12 @@ func main() {
gojob.WithNumWorkers(8),
gojob.WithMaxRetries(4),
gojob.WithMaxRuntimePerTaskSeconds(16),
gojob.WithNumShards(4),
gojob.WithShard(0),
gojob.WithResultFilePath("-"),
gojob.WithStatusFilePath("status.json"),
).
Start()
for range utils.Cat("-") {
scheduler.Submit(New())
for line := range utils.Cat("-") {
scheduler.Submit(New(line))
}
scheduler.Wait()
}
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/WangYihang/uio v0.0.0-20240829150037-ce8fff04fa48
github.com/google/uuid v1.6.0
github.com/jessevdk/go-flags v1.5.0
github.com/minio/minio-go/v7 v7.0.75
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.1
)
Expand All @@ -21,11 +20,10 @@ require (
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v7 v7.0.75 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rs/xid v1.5.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
Expand Down
20 changes: 0 additions & 20 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
github.com/WangYihang/uio v0.0.0-20240823105731-198f3f476e5b h1:1I+0pNZhyX14QB88YhyfVt4v+GYYl++Iifl80Ia8dEI=
github.com/WangYihang/uio v0.0.0-20240823105731-198f3f476e5b/go.mod h1:HTnt+mSC0TQ3X11gfVeXq5MIbMnAWv/JYRbWzGM8b/w=
github.com/WangYihang/uio v0.0.0-20240823112936-bc0b8701f084 h1:qZZz52peiA/5yFM5HSejQPJHVOMqAZ+sEm/ewDxPzCc=
github.com/WangYihang/uio v0.0.0-20240823112936-bc0b8701f084/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4=
github.com/WangYihang/uio v0.0.0-20240823113655-a867da1ceb34 h1:/nOu6QoE4SuMBrZGZKR5bDNSQofUv8QTJMdpdffd5+Q=
github.com/WangYihang/uio v0.0.0-20240823113655-a867da1ceb34/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4=
github.com/WangYihang/uio v0.0.0-20240823113814-1c82dd5fe131 h1:haqL2GQ1dRDcOgTpJkBIJcI6j4xK0+ydrLuU3M3V3FU=
github.com/WangYihang/uio v0.0.0-20240823113814-1c82dd5fe131/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4=
github.com/WangYihang/uio v0.0.0-20240823113922-10bd9224d4c1 h1:zKWH0lwVIssDDYd8GvPgxmtXKSxvf47v2H1EtBNs0WA=
github.com/WangYihang/uio v0.0.0-20240823113922-10bd9224d4c1/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4=
github.com/WangYihang/uio v0.0.0-20240823114425-34e6b93251cd h1:1e3rdqd+Rlf5rMznGmV3zxmQmXAmSWfEUaS+H/li3p8=
github.com/WangYihang/uio v0.0.0-20240823114425-34e6b93251cd/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4=
github.com/WangYihang/uio v0.0.0-20240826152024-f144bed245c7 h1:hcqowlVNExdNjr3M6yMR778GQJoVs6dxy/dybJBzF9o=
github.com/WangYihang/uio v0.0.0-20240826152024-f144bed245c7/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4=
github.com/WangYihang/uio v0.0.0-20240829150037-ce8fff04fa48 h1:PUBSQ5ANsRjj6g0jYPKVIo5m50iTS6gxY6TfCl29Rhk=
github.com/WangYihang/uio v0.0.0-20240829150037-ce8fff04fa48/go.mod h1:T4KtUOTAaSgFJNlU8mEMi5tPSUM1P8Fe1SrjMUwbxZ4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -57,12 +43,6 @@ github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
Expand Down
124 changes: 3 additions & 121 deletions pkg/utils/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,11 @@ package utils

import (
"bufio"
"compress/gzip"
"context"
"fmt"
"io"
"log/slog"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/WangYihang/uio"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)

// Head takes a channel and returns a channel with the first n items
Expand Down Expand Up @@ -86,7 +78,9 @@ func Cat(filePath string) <-chan string {

scanner := bufio.NewScanner(file.(io.Reader)) // Change the type of file to io.Reader
for scanner.Scan() {
out <- strings.TrimSpace(scanner.Text()) // Send the line to the channel
line := scanner.Text()
slog.Info("reading line", slog.String("line", line))
out <- strings.TrimSpace(line) // Send the line to the channel
}

// Check for errors during Scan, excluding EOF
Expand All @@ -105,115 +99,3 @@ func Count[T any](in <-chan T) (count int64) {
}
return count
}

type ReadDiscardCloser struct {
io.Writer
io.Reader
}

func (wc ReadDiscardCloser) Close() error {
return nil
}

func OpenFile(path string) (io.WriteCloser, error) {
protocol, err := ParseProtocol(path)
if err != nil {
return nil, err
}
switch protocol {
case "s3":
slog.Info("opening s3 file", slog.String("path", path))
return OpenS3File(path)
case "file":
slog.Info("opening local file", slog.String("path", path))
return OpenLocalFile(path)
default:
slog.Warn("unsupported protocol", slog.String("protocol", protocol))
return nil, fmt.Errorf("unsupported protocol: %s", protocol)
}
}

// OpenS3File opens a file from S3
// e.g. s3://default/data.json?region=us-west-1&endpoint=s3.amazonaws.com&access_key=********************&secret_key=****************************************
func OpenS3File(path string) (io.WriteCloser, error) {
// Parse the path
parsed, _ := url.Parse(path)
bucketName := parsed.Host
objectKey := strings.TrimLeft(parsed.Path, "/")
query := parsed.Query()
endpoint := query.Get("endpoint")
if endpoint == "" {
endpoint = "s3.amazonaws.com"
}
accessKey := query.Get("access_key")
secretKey := query.Get("secret_key")
region := query.Get("region")
slog.Info(
"parsed s3 path",
slog.String("access_key", accessKey),
slog.String("secret_key", secretKey),
slog.String("bucket", bucketName),
slog.String("object", objectKey),
slog.String("endpoint", endpoint),
slog.String("region", region),
)

// Download file from S3 into a temporary file
slog.Info("downloading file from s3", slog.String("bucket", bucketName), slog.String("object", objectKey))
s3Client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
Secure: true,
Region: region,
})
if err != nil {
return nil, err
}
reader, err := s3Client.GetObject(context.Background(), bucketName, objectKey, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
defer reader.Close()

fd, err := os.CreateTemp("", "gojob-*")
if err != nil {
return nil, err
}
defer fd.Close()

_, err = io.Copy(fd, reader)
if err != nil {
return nil, err
}

// Open the temporary file
slog.Info("opening local file", slog.String("path", fd.Name()))
return OpenLocalFile(fd.Name())
}

func OpenLocalFile(path string) (io.ReadWriteCloser, error) {
switch path {
case "-":
return ReadDiscardCloser{Writer: os.Stdout, Reader: os.Stdin}, nil
case "":
// bug: input file can not be empty
return ReadDiscardCloser{Writer: io.Discard}, nil
default:
// Create folder
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
// Open file
// Check the file extension, if it is .gz, use gzip.NewReadWriter
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if strings.HasSuffix(path, ".gz") {
gzipFd, err := gzip.NewReader(fd)
if err != nil {
return nil, err
}
return ReadDiscardCloser{Writer: io.Discard, Reader: gzipFd}, nil
} else {
return fd, err
}
}
}
3 changes: 2 additions & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/WangYihang/gojob/pkg/utils"
"github.com/WangYihang/uio"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -225,7 +226,7 @@ func WithMetadataFilePath(path string) schedulerOption {

// chanRecorder records the channel to a given file path
func chanRecorder[T *basicTask | Status | schedulerMetadata](path string, ch <-chan T, wg *sync.WaitGroup) {
fd, err := utils.OpenFile(path)
fd, err := uio.Open(path)
if err != nil {
slog.Error("error occurred while opening file", slog.String("path", path), slog.String("error", err.Error()))
return
Expand Down

0 comments on commit 3d48b32

Please sign in to comment.