Skip to content

Commit

Permalink
feat: write metadata/status file to stderr
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Mar 14, 2024
1 parent 870fafb commit f8fbc0e
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"net/http"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
)

type MyTask struct {
Expand Down
4 changes: 2 additions & 2 deletions examples/complex-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/examples/complex-http-crawler/pkg/model"
"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
"github.com/jessevdk/go-flags"
)

Expand Down Expand Up @@ -37,7 +37,7 @@ func main() {
SetShard(int64(opts.Shard)).
SetOutputFilePath(opts.OutputFilePath).
Start()
for line := range util.Cat(opts.InputFilePath) {
for line := range utils.Cat(opts.InputFilePath) {
scheduler.Submit(model.New(string(line)))
}
scheduler.Wait()
Expand Down
4 changes: 2 additions & 2 deletions examples/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
)

type MyTask struct{}
Expand All @@ -31,7 +31,7 @@ func main() {
SetMetadata("a", "b").
SetMetadata("c", "d").
Start()
for range util.Cat("data/input.txt") {
for range utils.Cat("data/input.txt") {
scheduler.Submit(New())
}
scheduler.Wait()
Expand Down
4 changes: 2 additions & 2 deletions examples/nopper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
)

type MyTask struct{}
Expand All @@ -29,7 +29,7 @@ func main() {
SetOutputFilePath("data/output.txt").
SetStatusFilePath("data/output.status").
Start()
for range util.Cat("data/input.txt") {
for range utils.Cat("data/input.txt") {
scheduler.Submit(New())
}
scheduler.Wait()
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net/http"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
)

type MyTask struct {
Expand Down Expand Up @@ -36,7 +36,7 @@ func main() {
SetNumShards(4).
SetShard(0).
Start()
for line := range util.Cat("data/input.txt") {
for line := range utils.Cat("data/input.txt") {
scheduler.Submit(New(line))
}
scheduler.Wait()
Expand Down
1 change: 1 addition & 0 deletions examples/sleeper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
SetMaxRuntimePerTaskSeconds(16).
SetNumShards(4).
SetShard(0).
SetOutputFilePath("output.txt").
Start()
for i := 0; i < 256; i++ {
scheduler.Submit(New(i, rand.Intn(10)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/capture.go → pkg/utils/capture.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package utils

import (
"bytes"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/channel.go → pkg/utils/channel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package utils

import "sync"

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/io.go → pkg/utils/io.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package utils

import (
"bufio"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/mapreduce.go → pkg/utils/mapreduce.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package utils

// Map takes a channel and returns a channel with the items that pass the filter
func Map[T interface{}, U interface{}](in chan T, f func(T) U) chan U {
Expand Down
40 changes: 40 additions & 0 deletions pkg/utils/tee.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package utils

import (
"io"
)

// TeeWriterCloser structure, used to write to and close multiple io.WriteClosers
type TeeWriterCloser struct {
writers []io.WriteCloser
}

// NewTeeWriterCloser creates a new instance of TeeWriterCloser
func NewTeeWriterCloser(writers ...io.WriteCloser) *TeeWriterCloser {
return &TeeWriterCloser{
writers: writers,
}
}

// Write implements the io.Writer interface
// It writes the given byte slice to all the contained writers
func (t *TeeWriterCloser) Write(p []byte) (n int, err error) {
for _, w := range t.writers {
n, err = w.Write(p)
if err != nil {
return
}
}
return len(p), nil
}

// Close implements the io.Closer interface
// It closes all the contained writers and returns the first encountered error
func (t *TeeWriterCloser) Close() (err error) {
for _, w := range t.writers {
if cerr := w.Close(); cerr != nil && err == nil {
err = cerr // Record the first encountered error
}
}
return
}
2 changes: 1 addition & 1 deletion pkg/util/timeout.go → pkg/utils/timeout.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package utils

import (
"context"
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/timeout_test.go → pkg/utils/timeout_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package util_test
package utils_test

import (
"testing"
"time"

"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
)

func TestRunWithTimeout(t *testing.T) {
Expand All @@ -13,7 +13,7 @@ func TestRunWithTimeout(t *testing.T) {
return nil
}

err := util.RunWithTimeout(task, 1*time.Second)
err := utils.RunWithTimeout(task, 1*time.Second)
if err == nil {
t.Errorf("Expected timeout error, got nil")
}
Expand Down
8 changes: 4 additions & 4 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/WangYihang/gojob/pkg/util"
"github.com/WangYihang/gojob/pkg/utils"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *Scheduler) SetStatusFilePath(statusFilePath string) *Scheduler {
if err != nil {
panic(err)
}
s.StatusFd = fd
s.StatusFd = utils.NewTeeWriterCloser(fd, os.Stderr)
return s
}

Expand All @@ -131,7 +131,7 @@ func (s *Scheduler) SetMetadataFilePath(metadataFilePath string) *Scheduler {
if err != nil {
panic(err)
}
s.MetadataFd = fd
s.MetadataFd = utils.NewTeeWriterCloser(fd, os.Stderr)
return s
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *Scheduler) Worker() {
task.NumTries++
task.FinishedAt = time.Now().UnixMicro()
}()
return util.RunWithTimeout(task.Task.Do, time.Duration(s.MaxRuntimePerTaskSeconds)*time.Second)
return utils.RunWithTimeout(task.Task.Do, time.Duration(s.MaxRuntimePerTaskSeconds)*time.Second)
}()
if err != nil {
task.Error = err.Error()
Expand Down

0 comments on commit f8fbc0e

Please sign in to comment.