diff --git a/.goreleaser.yaml b/.goreleaser.yaml index c94b8eb..39225a2 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -6,7 +6,7 @@ # yaml-language-server: $schema=https://goreleaser.com/static/schema.json # vim: set ts=2 sw=2 tw=0 fo=cnqoj -version: 1 +version: 2 before: hooks: diff --git a/examples/load-s3-store-s3/README.md b/examples/load-s3-store-s3/README.md new file mode 100644 index 0000000..b54bcdf --- /dev/null +++ b/examples/load-s3-store-s3/README.md @@ -0,0 +1,7 @@ +## Usage + +Use the following path as the input filepath. + +``` +s3://default/data.json?region=us-west-2&endpoint=s3.amazonaws.com&access_key=********************&secret_key=**************************************** +``` \ No newline at end of file diff --git a/examples/load-s3-store-s3/main.go b/examples/load-s3-store-s3/main.go new file mode 100644 index 0000000..e85ff7c --- /dev/null +++ b/examples/load-s3-store-s3/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "log/slog" + + "github.com/WangYihang/gojob/pkg/utils" + "github.com/WangYihang/uio" +) + +func main() { + resultFd, err := uio.Open("s3://uio/output.txt" + + "?endpoint=127.0.0.1:9000" + + "&access_key=minioadmin" + + "&secret_key=minioadmin" + + "&insecure=true" + + "&mode=write", + ) + if err != nil { + slog.Error("failed to open result file", slog.String("error", err.Error())) + return + } + defer resultFd.Close() + for line := range utils.Cat( + "s3://uio/input.txt" + + "?endpoint=127.0.0.1:9000" + + "&access_key=minioadmin" + + "&secret_key=minioadmin" + + "&insecure=true" + + "&mode=read", + ) { + slog.Info("s3", slog.String("line", line)) + resultFd.Write([]byte(line + "\n")) + } +} diff --git a/examples/stdio/main.go b/examples/stdio/main.go index a91f5d4..e9f668a 100644 --- a/examples/stdio/main.go +++ b/examples/stdio/main.go @@ -1,21 +1,24 @@ package main import ( - "math/rand" - "time" + "fmt" "github.com/WangYihang/gojob" "github.com/WangYihang/gojob/pkg/utils" ) -type MyTask struct{} +type MyPrinterTask struct { + line string +} -func New() *MyTask { - return &MyTask{} +func New(line string) *MyPrinterTask { + return &MyPrinterTask{ + line: line, + } } -func (t *MyTask) Do() error { - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) +func (t *MyPrinterTask) Do() error { + fmt.Println(t.line) return nil } @@ -24,14 +27,12 @@ func main() { gojob.WithNumWorkers(8), gojob.WithMaxRetries(4), gojob.WithMaxRuntimePerTaskSeconds(16), - gojob.WithNumShards(4), - gojob.WithShard(0), gojob.WithResultFilePath("-"), gojob.WithStatusFilePath("status.json"), ). Start() - for range utils.Cat("-") { - scheduler.Submit(New()) + for line := range utils.Cat("-") { + scheduler.Submit(New(line)) } scheduler.Wait() } diff --git a/go.mod b/go.mod index 732826d..579d536 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/WangYihang/uio v0.0.0-20240829150037-ce8fff04fa48 github.com/google/uuid v1.6.0 github.com/jessevdk/go-flags v1.5.0 - github.com/minio/minio-go/v7 v7.0.75 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.6.1 ) @@ -21,11 +20,10 @@ require ( github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/minio-go/v7 v7.0.75 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rs/xid v1.5.0 // indirect - go.uber.org/multierr v1.10.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.21.0 // indirect diff --git a/go.sum b/go.sum index 6e70b32..98f85db 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,3 @@ -github.com/WangYihang/uio v0.0.0-20240823105731-198f3f476e5b h1:1I+0pNZhyX14QB88YhyfVt4v+GYYl++Iifl80Ia8dEI= -github.com/WangYihang/uio v0.0.0-20240823105731-198f3f476e5b/go.mod h1:HTnt+mSC0TQ3X11gfVeXq5MIbMnAWv/JYRbWzGM8b/w= -github.com/WangYihang/uio v0.0.0-20240823112936-bc0b8701f084 h1:qZZz52peiA/5yFM5HSejQPJHVOMqAZ+sEm/ewDxPzCc= -github.com/WangYihang/uio v0.0.0-20240823112936-bc0b8701f084/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4= -github.com/WangYihang/uio v0.0.0-20240823113655-a867da1ceb34 h1:/nOu6QoE4SuMBrZGZKR5bDNSQofUv8QTJMdpdffd5+Q= -github.com/WangYihang/uio v0.0.0-20240823113655-a867da1ceb34/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4= -github.com/WangYihang/uio v0.0.0-20240823113814-1c82dd5fe131 h1:haqL2GQ1dRDcOgTpJkBIJcI6j4xK0+ydrLuU3M3V3FU= -github.com/WangYihang/uio v0.0.0-20240823113814-1c82dd5fe131/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4= -github.com/WangYihang/uio v0.0.0-20240823113922-10bd9224d4c1 h1:zKWH0lwVIssDDYd8GvPgxmtXKSxvf47v2H1EtBNs0WA= -github.com/WangYihang/uio v0.0.0-20240823113922-10bd9224d4c1/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4= -github.com/WangYihang/uio v0.0.0-20240823114425-34e6b93251cd h1:1e3rdqd+Rlf5rMznGmV3zxmQmXAmSWfEUaS+H/li3p8= -github.com/WangYihang/uio v0.0.0-20240823114425-34e6b93251cd/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4= -github.com/WangYihang/uio v0.0.0-20240826152024-f144bed245c7 h1:hcqowlVNExdNjr3M6yMR778GQJoVs6dxy/dybJBzF9o= -github.com/WangYihang/uio v0.0.0-20240826152024-f144bed245c7/go.mod h1:0AO9XwgENfIEpS5lo7doL3CxIRXK1LkgLasTEl6xAG4= github.com/WangYihang/uio v0.0.0-20240829150037-ce8fff04fa48 h1:PUBSQ5ANsRjj6g0jYPKVIo5m50iTS6gxY6TfCl29Rhk= github.com/WangYihang/uio v0.0.0-20240829150037-ce8fff04fa48/go.mod h1:T4KtUOTAaSgFJNlU8mEMi5tPSUM1P8Fe1SrjMUwbxZ4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -57,12 +43,6 @@ github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= diff --git a/pkg/utils/io.go b/pkg/utils/io.go index 30bae37..8b0c424 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -2,19 +2,11 @@ package utils import ( "bufio" - "compress/gzip" - "context" - "fmt" "io" "log/slog" - "net/url" - "os" - "path/filepath" "strings" "github.com/WangYihang/uio" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" ) // Head takes a channel and returns a channel with the first n items @@ -86,7 +78,9 @@ func Cat(filePath string) <-chan string { scanner := bufio.NewScanner(file.(io.Reader)) // Change the type of file to io.Reader for scanner.Scan() { - out <- strings.TrimSpace(scanner.Text()) // Send the line to the channel + line := scanner.Text() + slog.Info("reading line", slog.String("line", line)) + out <- strings.TrimSpace(line) // Send the line to the channel } // Check for errors during Scan, excluding EOF @@ -105,115 +99,3 @@ func Count[T any](in <-chan T) (count int64) { } return count } - -type ReadDiscardCloser struct { - io.Writer - io.Reader -} - -func (wc ReadDiscardCloser) Close() error { - return nil -} - -func OpenFile(path string) (io.WriteCloser, error) { - protocol, err := ParseProtocol(path) - if err != nil { - return nil, err - } - switch protocol { - case "s3": - slog.Info("opening s3 file", slog.String("path", path)) - return OpenS3File(path) - case "file": - slog.Info("opening local file", slog.String("path", path)) - return OpenLocalFile(path) - default: - slog.Warn("unsupported protocol", slog.String("protocol", protocol)) - return nil, fmt.Errorf("unsupported protocol: %s", protocol) - } -} - -// OpenS3File opens a file from S3 -// e.g. s3://default/data.json?region=us-west-1&endpoint=s3.amazonaws.com&access_key=********************&secret_key=**************************************** -func OpenS3File(path string) (io.WriteCloser, error) { - // Parse the path - parsed, _ := url.Parse(path) - bucketName := parsed.Host - objectKey := strings.TrimLeft(parsed.Path, "/") - query := parsed.Query() - endpoint := query.Get("endpoint") - if endpoint == "" { - endpoint = "s3.amazonaws.com" - } - accessKey := query.Get("access_key") - secretKey := query.Get("secret_key") - region := query.Get("region") - slog.Info( - "parsed s3 path", - slog.String("access_key", accessKey), - slog.String("secret_key", secretKey), - slog.String("bucket", bucketName), - slog.String("object", objectKey), - slog.String("endpoint", endpoint), - slog.String("region", region), - ) - - // Download file from S3 into a temporary file - slog.Info("downloading file from s3", slog.String("bucket", bucketName), slog.String("object", objectKey)) - s3Client, err := minio.New(endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(accessKey, secretKey, ""), - Secure: true, - Region: region, - }) - if err != nil { - return nil, err - } - reader, err := s3Client.GetObject(context.Background(), bucketName, objectKey, minio.GetObjectOptions{}) - if err != nil { - return nil, err - } - defer reader.Close() - - fd, err := os.CreateTemp("", "gojob-*") - if err != nil { - return nil, err - } - defer fd.Close() - - _, err = io.Copy(fd, reader) - if err != nil { - return nil, err - } - - // Open the temporary file - slog.Info("opening local file", slog.String("path", fd.Name())) - return OpenLocalFile(fd.Name()) -} - -func OpenLocalFile(path string) (io.ReadWriteCloser, error) { - switch path { - case "-": - return ReadDiscardCloser{Writer: os.Stdout, Reader: os.Stdin}, nil - case "": - // bug: input file can not be empty - return ReadDiscardCloser{Writer: io.Discard}, nil - default: - // Create folder - dir := filepath.Dir(path) - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, err - } - // Open file - // Check the file extension, if it is .gz, use gzip.NewReadWriter - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if strings.HasSuffix(path, ".gz") { - gzipFd, err := gzip.NewReader(fd) - if err != nil { - return nil, err - } - return ReadDiscardCloser{Writer: io.Discard, Reader: gzipFd}, nil - } else { - return fd, err - } - } -} diff --git a/scheduler.go b/scheduler.go index 477ce4e..029dc13 100644 --- a/scheduler.go +++ b/scheduler.go @@ -9,6 +9,7 @@ import ( "time" "github.com/WangYihang/gojob/pkg/utils" + "github.com/WangYihang/uio" "github.com/google/uuid" ) @@ -225,7 +226,7 @@ func WithMetadataFilePath(path string) schedulerOption { // chanRecorder records the channel to a given file path func chanRecorder[T *basicTask | Status | schedulerMetadata](path string, ch <-chan T, wg *sync.WaitGroup) { - fd, err := utils.OpenFile(path) + fd, err := uio.Open(path) if err != nil { slog.Error("error occurred while opening file", slog.String("path", path), slog.String("error", err.Error())) return