Skip to content

Commit

Permalink
feat: support set metadata to scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Feb 28, 2024
1 parent fb4b3da commit 6f305af
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
38 changes: 38 additions & 0 deletions examples/metadata/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"math/rand"
"time"

"github.com/WangYihang/gojob"
"github.com/WangYihang/gojob/pkg/util"
)

type MyTask struct{}

func New() *MyTask {
return &MyTask{}
}

func (t *MyTask) Do() error {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return nil
}

func main() {
scheduler := gojob.NewScheduler().
SetNumWorkers(8).
SetMaxRetries(4).
SetMaxRuntimePerTaskSeconds(16).
SetNumShards(4).
SetShard(0).
SetOutputFilePath("data/output.txt").
SetStatusFilePath("data/output.status").
SetMetadata("a", "b").
SetMetadata("c", "d").
Start()
for range util.Cat("data/input.txt") {
scheduler.Submit(New())
}
scheduler.Wait()
}
40 changes: 40 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Scheduler struct {
OutputFd io.WriteCloser
StatusFilePath string
StatusFd io.WriteCloser
MetadataFilePath string
MetadataFd io.WriteCloser
Metadata map[string]string
MaxRetries int
MaxRuntimePerTaskSeconds int
NumShards int64
Expand All @@ -44,6 +47,9 @@ func NewScheduler() *Scheduler {
OutputFd: os.Stdout,
StatusFilePath: "-",
StatusFd: os.Stderr,
MetadataFilePath: "-",
MetadataFd: os.Stderr,
Metadata: make(map[string]string),
MaxRetries: 4,
MaxRuntimePerTaskSeconds: 16,
NumShards: 1,
Expand Down Expand Up @@ -111,6 +117,17 @@ func (s *Scheduler) SetStatusFilePath(statusFilePath string) *Scheduler {
return s
}

// SetMetadataFilePath sets the metadata file path
func (s *Scheduler) SetMetadataFilePath(metadataFilePath string) *Scheduler {
s.MetadataFilePath = metadataFilePath
fd, err := FilePathToFd(s.MetadataFilePath)
if err != nil {
panic(err)
}
s.MetadataFd = fd
return s
}

// SetMaxRetries sets the max retries
func (s *Scheduler) SetMaxRetries(maxRetries int) *Scheduler {
if maxRetries <= 0 {
Expand All @@ -129,10 +146,31 @@ func (s *Scheduler) SetMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) *S
return s
}

// SetTotalTasks sets the total number of tasks
func (s *Scheduler) SetTotalTasks(numTotalTasks int64) {
s.NumTotalTasks.Store(numTotalTasks)
}

// AddMetadata adds metadata
func (s *Scheduler) SetMetadata(key, value string) *Scheduler {
if s.IsStarted {
panic("cannot add metadata after starting")
}
s.Metadata[key] = value
return s
}

// Save saves metadata
func (s *Scheduler) Save() {
data, err := json.Marshal(s.Metadata)
if err != nil {
slog.Error("error occured while serializing metadata", slog.String("error", err.Error()))
} else {
s.MetadataFd.Write(data)
s.MetadataFd.Write([]byte("\n"))
}
}

// Submit submits a task to the scheduler
func (s *Scheduler) Submit(task Task) {
if !s.IsStarted {
Expand All @@ -151,6 +189,7 @@ func (s *Scheduler) Start() *Scheduler {
if s.IsStarted {
return s
}
s.Save()
for i := 0; i < s.NumWorkers; i++ {
go s.Worker()
}
Expand All @@ -169,6 +208,7 @@ func (s *Scheduler) Wait() {
close(s.LogChan)
close(s.DoneChan)
s.statusWg.Wait()
s.Save()
}

func (s *Scheduler) Status() Status {
Expand Down

0 comments on commit 6f305af

Please sign in to comment.