Skip to content

Commit

Permalink
feat: add failed/succeed task count in status
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Mar 28, 2024
1 parent b2a490b commit 28a2b73
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
22 changes: 10 additions & 12 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Scheduler struct {
Shard int64
IsStarted bool
CurrentIndex atomic.Int64
NumDoneTasks atomic.Int64
NumTotalTasks atomic.Int64
FailedTaskCount atomic.Int64
SucceedTaskCount atomic.Int64
TotalTaskCount atomic.Int64
TaskChan chan *BasicTask
LogChan chan string
DoneChan chan struct{}
Expand All @@ -54,8 +55,8 @@ func NewScheduler() *Scheduler {
Shard: 0,
IsStarted: false,
CurrentIndex: atomic.Int64{},
NumDoneTasks: atomic.Int64{},
NumTotalTasks: atomic.Int64{},
SucceedTaskCount: atomic.Int64{},
TotalTaskCount: atomic.Int64{},
TaskChan: make(chan *BasicTask),
LogChan: make(chan string),
DoneChan: make(chan struct{}),
Expand Down Expand Up @@ -176,7 +177,7 @@ func (s *Scheduler) SetTotalTasks(numTotalTasks int64) *Scheduler {
}

// Store the number of tasks for this shard
s.NumTotalTasks.Store(baseTasksPerShard)
s.TotalTaskCount.Store(baseTasksPerShard)
return s
}

Expand Down Expand Up @@ -240,12 +241,8 @@ func (s *Scheduler) Wait() {
s.MetadataFd.Close()
}

func (s *Scheduler) Status() Status {
return Status{
Timestamp: time.Now().Format(time.RFC3339),
NumDone: s.NumDoneTasks.Load(),
NumTotal: s.NumTotalTasks.Load(),
}
func (s *Scheduler) Status() *Status {
return NewStatus(s.FailedTaskCount.Load(), s.SucceedTaskCount.Load(), s.TotalTaskCount.Load())
}

// Worker is a worker
Expand All @@ -272,13 +269,14 @@ func (s *Scheduler) Worker() {
data, err := json.Marshal(task)
if err != nil {
slog.Error("error occured while serializing task", slog.String("error", err.Error()))
s.FailedTaskCount.Add(1)
} else {
s.logWg.Add(1)
s.LogChan <- string(data)
s.SucceedTaskCount.Add(1)
}
// Notify task is done
s.taskWg.Done()
s.NumDoneTasks.Add(1)
}
}

Expand Down
18 changes: 15 additions & 3 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ package gojob
import "encoding/json"

type Status struct {
Timestamp string `json:"timestamp"`
NumDone int64 `json:"num_done"`
NumTotal int64 `json:"num_total"`
Timestamp string `json:"timestamp"`
FailedTaskCount int64 `json:"num_failed"`
SucceedTaskCount int64 `json:"num_succeed"`
FinishedTaskCount int64 `json:"num_done"`
TotalTaskCount int64 `json:"num_total"`
}

func NewStatus(failedTaskCount, succeedTaskCount, totalTaskCount int64) *Status {
return &Status{
Timestamp: "",
FailedTaskCount: failedTaskCount,
SucceedTaskCount: succeedTaskCount,
FinishedTaskCount: failedTaskCount + succeedTaskCount,
TotalTaskCount: totalTaskCount,
}
}

func (s Status) String() string {
Expand Down

0 comments on commit 28a2b73

Please sign in to comment.