From 6f305afb96429ec66675e9b3e09521506e6eaee8 Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Wed, 28 Feb 2024 19:16:45 +0800 Subject: [PATCH] feat: support set metadata to scheduler --- examples/metadata/main.go | 38 +++++++++++++++++++++++++++++++++++++ scheduler.go | 40 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 examples/metadata/main.go diff --git a/examples/metadata/main.go b/examples/metadata/main.go new file mode 100644 index 0000000..483c7c6 --- /dev/null +++ b/examples/metadata/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "math/rand" + "time" + + "github.com/WangYihang/gojob" + "github.com/WangYihang/gojob/pkg/util" +) + +type MyTask struct{} + +func New() *MyTask { + return &MyTask{} +} + +func (t *MyTask) Do() error { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + return nil +} + +func main() { + scheduler := gojob.NewScheduler(). + SetNumWorkers(8). + SetMaxRetries(4). + SetMaxRuntimePerTaskSeconds(16). + SetNumShards(4). + SetShard(0). + SetOutputFilePath("data/output.txt"). + SetStatusFilePath("data/output.status"). + SetMetadata("a", "b"). + SetMetadata("c", "d"). + Start() + for range util.Cat("data/input.txt") { + scheduler.Submit(New()) + } + scheduler.Wait() +} diff --git a/scheduler.go b/scheduler.go index b99a7a8..ba543aa 100644 --- a/scheduler.go +++ b/scheduler.go @@ -20,6 +20,9 @@ type Scheduler struct { OutputFd io.WriteCloser StatusFilePath string StatusFd io.WriteCloser + MetadataFilePath string + MetadataFd io.WriteCloser + Metadata map[string]string MaxRetries int MaxRuntimePerTaskSeconds int NumShards int64 @@ -44,6 +47,9 @@ func NewScheduler() *Scheduler { OutputFd: os.Stdout, StatusFilePath: "-", StatusFd: os.Stderr, + MetadataFilePath: "-", + MetadataFd: os.Stderr, + Metadata: make(map[string]string), MaxRetries: 4, MaxRuntimePerTaskSeconds: 16, NumShards: 1, @@ -111,6 +117,17 @@ func (s *Scheduler) SetStatusFilePath(statusFilePath string) *Scheduler { return s } +// SetMetadataFilePath sets the metadata file path +func (s *Scheduler) SetMetadataFilePath(metadataFilePath string) *Scheduler { + s.MetadataFilePath = metadataFilePath + fd, err := FilePathToFd(s.MetadataFilePath) + if err != nil { + panic(err) + } + s.MetadataFd = fd + return s +} + // SetMaxRetries sets the max retries func (s *Scheduler) SetMaxRetries(maxRetries int) *Scheduler { if maxRetries <= 0 { @@ -129,10 +146,31 @@ func (s *Scheduler) SetMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) *S return s } +// SetTotalTasks sets the total number of tasks func (s *Scheduler) SetTotalTasks(numTotalTasks int64) { s.NumTotalTasks.Store(numTotalTasks) } +// AddMetadata adds metadata +func (s *Scheduler) SetMetadata(key, value string) *Scheduler { + if s.IsStarted { + panic("cannot add metadata after starting") + } + s.Metadata[key] = value + return s +} + +// Save saves metadata +func (s *Scheduler) Save() { + data, err := json.Marshal(s.Metadata) + if err != nil { + slog.Error("error occured while serializing metadata", slog.String("error", err.Error())) + } else { + s.MetadataFd.Write(data) + s.MetadataFd.Write([]byte("\n")) + } +} + // Submit submits a task to the scheduler func (s *Scheduler) Submit(task Task) { if !s.IsStarted { @@ -151,6 +189,7 @@ func (s *Scheduler) Start() *Scheduler { if s.IsStarted { return s } + s.Save() for i := 0; i < s.NumWorkers; i++ { go s.Worker() } @@ -169,6 +208,7 @@ func (s *Scheduler) Wait() { close(s.LogChan) close(s.DoneChan) s.statusWg.Wait() + s.Save() } func (s *Scheduler) Status() Status {