From 345aa971bc06e6f7ed674d6cb27c3ce9512e9b85 Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Tue, 27 Feb 2024 16:38:47 +0800 Subject: [PATCH] feat: add progress bar --- examples/nopper/main.go | 34 ++++++++++++++++++++++++++++++++++ examples/sleeper/main.go | 2 +- go.mod | 12 ++++++++++-- go.sum | 23 +++++++++++++++++++++-- gojob.go | 36 +++++------------------------------- gojob_test.go | 10 ---------- 6 files changed, 71 insertions(+), 46 deletions(-) create mode 100644 examples/nopper/main.go diff --git a/examples/nopper/main.go b/examples/nopper/main.go new file mode 100644 index 0000000..0b3779f --- /dev/null +++ b/examples/nopper/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "math/rand" + "time" + + "github.com/WangYihang/gojob" +) + +type MyTask struct{} + +func New() *MyTask { + return &MyTask{} +} + +func (t *MyTask) Do() error { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return nil +} + +func main() { + scheduler := gojob.NewScheduler(). + SetNumWorkers(8). + SetMaxRetries(4). + SetMaxRuntimePerTaskSeconds(16). + SetNumShards(4). + SetShard(0). + SetOutputFilePath("test.txt"). + Start() + for i := 0; i < 257; i++ { + scheduler.Submit(New()) + } + scheduler.Wait() +} diff --git a/examples/sleeper/main.go b/examples/sleeper/main.go index 9865950..1f66b53 100644 --- a/examples/sleeper/main.go +++ b/examples/sleeper/main.go @@ -32,7 +32,7 @@ func main() { SetNumShards(4). SetShard(0). Start() - for i := 0; i < 16; i++ { + for i := 0; i < 256; i++ { scheduler.Submit(New(i, rand.Intn(10))) } scheduler.Wait() diff --git a/go.mod b/go.mod index ebdbd89..f426be4 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,14 @@ module github.com/WangYihang/gojob go 1.21 -require github.com/jessevdk/go-flags v1.5.0 +require ( + github.com/jessevdk/go-flags v1.5.0 + github.com/schollz/progressbar/v3 v3.14.2 +) -require golang.org/x/sys v0.1.0 // indirect +require ( + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect + github.com/rivo/uniseg v0.4.7 // indirect + golang.org/x/sys v0.17.0 // indirect + golang.org/x/term v0.17.0 // indirect +) diff --git a/go.sum b/go.sum index f410f54..eacec87 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,24 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= +github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks= +github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= -golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= diff --git a/gojob.go b/gojob.go index 0805502..243cd17 100644 --- a/gojob.go +++ b/gojob.go @@ -10,6 +10,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/schollz/progressbar/v3" ) // Task is an interface that defines a task @@ -50,13 +52,11 @@ type Scheduler struct { Shard int64 IsStarted bool CurrentIndex atomic.Int64 - NumDoneTasks atomic.Int64 TaskChan chan *BasicTask LogChan chan string - DoneChan chan struct{} taskWg *sync.WaitGroup logWg *sync.WaitGroup - progressWg *sync.WaitGroup + progressBar *progressbar.ProgressBar } // NewScheduler creates a new scheduler @@ -69,13 +69,11 @@ func NewScheduler() *Scheduler { NumShards: 3, Shard: 1, IsStarted: false, - NumDoneTasks: atomic.Int64{}, TaskChan: make(chan *BasicTask), LogChan: make(chan string), - DoneChan: make(chan struct{}), taskWg: &sync.WaitGroup{}, logWg: &sync.WaitGroup{}, - progressWg: &sync.WaitGroup{}, + progressBar: progressbar.Default(-1), } return scheduler } @@ -153,40 +151,16 @@ func (s *Scheduler) Start() *Scheduler { go s.Worker() } go s.Writer() - s.progressWg.Add(1) - go s.Progress() s.IsStarted = true return s } -// Progress prints progress -func (s *Scheduler) Progress() { - previousNumDoneTasks := s.NumDoneTasks.Load() - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer s.progressWg.Done() - for { - select { - case <-ticker.C: - currentNumDoneTasks := s.NumDoneTasks.Load() - ops := currentNumDoneTasks - previousNumDoneTasks - slog.Info("progress", slog.String("status", "active"), slog.Int64("num_done_tasks", currentNumDoneTasks), slog.Int64("ops", ops)) - previousNumDoneTasks = currentNumDoneTasks - case <-s.DoneChan: - slog.Info("progress", slog.String("status", "done"), slog.Int64("num_done_tasks", s.NumDoneTasks.Load())) - return - } - } -} - // Wait waits for all tasks to finish func (s *Scheduler) Wait() { s.taskWg.Wait() close(s.TaskChan) s.logWg.Wait() close(s.LogChan) - close(s.DoneChan) - s.progressWg.Wait() } // Worker is a worker @@ -219,7 +193,7 @@ func (s *Scheduler) Worker() { } // Notify task is done s.taskWg.Done() - s.NumDoneTasks.Add(1) + s.progressBar.Add(1) } } diff --git a/gojob_test.go b/gojob_test.go index ea60573..adcc674 100644 --- a/gojob_test.go +++ b/gojob_test.go @@ -64,16 +64,6 @@ func TestRunWithTimeout(t *testing.T) { } } -func TestSchedulerSubmit(t *testing.T) { - scheduler := gojob.NewScheduler().SetNumShards(2).SetShard(1) - safeWriter := NewSafeWriter() - task := NewTask(1, safeWriter) - scheduler.Submit(task) - if scheduler.NumDoneTasks.Load() != 1 { - t.Errorf("Expected NumTasks to be 1, got %d", scheduler.NumDoneTasks.Load()) - } -} - func TestSharding(t *testing.T) { testcases := []struct { numShards int64