Skip to content

Commit

Permalink
feat: add progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Feb 27, 2024
1 parent 1fe2754 commit 345aa97
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 46 deletions.
34 changes: 34 additions & 0 deletions examples/nopper/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"math/rand"
"time"

"github.com/WangYihang/gojob"
)

type MyTask struct{}

func New() *MyTask {
return &MyTask{}
}

func (t *MyTask) Do() error {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return nil
}

func main() {
scheduler := gojob.NewScheduler().
SetNumWorkers(8).
SetMaxRetries(4).
SetMaxRuntimePerTaskSeconds(16).
SetNumShards(4).
SetShard(0).
SetOutputFilePath("test.txt").
Start()
for i := 0; i < 257; i++ {
scheduler.Submit(New())
}
scheduler.Wait()
}
2 changes: 1 addition & 1 deletion examples/sleeper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
SetNumShards(4).
SetShard(0).
Start()
for i := 0; i < 16; i++ {
for i := 0; i < 256; i++ {
scheduler.Submit(New(i, rand.Intn(10)))
}
scheduler.Wait()
Expand Down
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ module github.com/WangYihang/gojob

go 1.21

require github.com/jessevdk/go-flags v1.5.0
require (
github.com/jessevdk/go-flags v1.5.0
github.com/schollz/progressbar/v3 v3.14.2
)

require golang.org/x/sys v0.1.0 // indirect
require (
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/rivo/uniseg v0.4.7 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
)
23 changes: 21 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks=
github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
36 changes: 5 additions & 31 deletions gojob.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/schollz/progressbar/v3"
)

// Task is an interface that defines a task
Expand Down Expand Up @@ -50,13 +52,11 @@ type Scheduler struct {
Shard int64
IsStarted bool
CurrentIndex atomic.Int64
NumDoneTasks atomic.Int64
TaskChan chan *BasicTask
LogChan chan string
DoneChan chan struct{}
taskWg *sync.WaitGroup
logWg *sync.WaitGroup
progressWg *sync.WaitGroup
progressBar *progressbar.ProgressBar
}

// NewScheduler creates a new scheduler
Expand All @@ -69,13 +69,11 @@ func NewScheduler() *Scheduler {
NumShards: 3,
Shard: 1,
IsStarted: false,
NumDoneTasks: atomic.Int64{},
TaskChan: make(chan *BasicTask),
LogChan: make(chan string),
DoneChan: make(chan struct{}),
taskWg: &sync.WaitGroup{},
logWg: &sync.WaitGroup{},
progressWg: &sync.WaitGroup{},
progressBar: progressbar.Default(-1),
}
return scheduler
}
Expand Down Expand Up @@ -153,40 +151,16 @@ func (s *Scheduler) Start() *Scheduler {
go s.Worker()
}
go s.Writer()
s.progressWg.Add(1)
go s.Progress()
s.IsStarted = true
return s
}

// Progress prints progress
func (s *Scheduler) Progress() {
previousNumDoneTasks := s.NumDoneTasks.Load()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer s.progressWg.Done()
for {
select {
case <-ticker.C:
currentNumDoneTasks := s.NumDoneTasks.Load()
ops := currentNumDoneTasks - previousNumDoneTasks
slog.Info("progress", slog.String("status", "active"), slog.Int64("num_done_tasks", currentNumDoneTasks), slog.Int64("ops", ops))
previousNumDoneTasks = currentNumDoneTasks
case <-s.DoneChan:
slog.Info("progress", slog.String("status", "done"), slog.Int64("num_done_tasks", s.NumDoneTasks.Load()))
return
}
}
}

// Wait waits for all tasks to finish
func (s *Scheduler) Wait() {
s.taskWg.Wait()
close(s.TaskChan)
s.logWg.Wait()
close(s.LogChan)
close(s.DoneChan)
s.progressWg.Wait()
}

// Worker is a worker
Expand Down Expand Up @@ -219,7 +193,7 @@ func (s *Scheduler) Worker() {
}
// Notify task is done
s.taskWg.Done()
s.NumDoneTasks.Add(1)
s.progressBar.Add(1)
}
}

Expand Down
10 changes: 0 additions & 10 deletions gojob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,6 @@ func TestRunWithTimeout(t *testing.T) {
}
}

func TestSchedulerSubmit(t *testing.T) {
scheduler := gojob.NewScheduler().SetNumShards(2).SetShard(1)
safeWriter := NewSafeWriter()
task := NewTask(1, safeWriter)
scheduler.Submit(task)
if scheduler.NumDoneTasks.Load() != 1 {
t.Errorf("Expected NumTasks to be 1, got %d", scheduler.NumDoneTasks.Load())
}
}

func TestSharding(t *testing.T) {
testcases := []struct {
numShards int64
Expand Down

0 comments on commit 345aa97

Please sign in to comment.