Skip to content

Commit

Permalink
feat: counting total number of tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Mar 14, 2024
1 parent f8fbc0e commit ac5fa2c
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 6 deletions.
2 changes: 2 additions & 0 deletions examples/complex-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func init() {
}

func main() {
total := utils.Count(utils.Cat(opts.InputFilePath))
scheduler := gojob.NewScheduler().
SetNumWorkers(opts.NumWorkers).
SetMaxRetries(opts.MaxRetries).
SetMaxRuntimePerTaskSeconds(opts.MaxRuntimePerTaskSeconds).
SetNumShards(int64(opts.NumShards)).
SetShard(int64(opts.Shard)).
SetOutputFilePath(opts.OutputFilePath).
SetTotalTasks(total).
Start()
for line := range utils.Cat(opts.InputFilePath) {
scheduler.Submit(model.New(string(line)))
Expand Down
5 changes: 4 additions & 1 deletion examples/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (t *MyTask) Do() error {
}

func main() {
inputFilePath := "data/input.txt"
total := utils.Count(utils.Cat(inputFilePath))
scheduler := gojob.NewScheduler().
SetNumWorkers(8).
SetMaxRetries(4).
Expand All @@ -28,10 +30,11 @@ func main() {
SetShard(0).
SetOutputFilePath("data/output.txt").
SetStatusFilePath("data/output.status").
SetTotalTasks(total).
SetMetadata("a", "b").
SetMetadata("c", "d").
Start()
for range utils.Cat("data/input.txt") {
for range utils.Cat(inputFilePath) {
scheduler.Submit(New())
}
scheduler.Wait()
Expand Down
5 changes: 4 additions & 1 deletion examples/nopper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (t *MyTask) Do() error {
}

func main() {
inputFilePath := "data/input.txt"
total := utils.Count(utils.Cat(inputFilePath))
scheduler := gojob.NewScheduler().
SetNumWorkers(8).
SetMaxRetries(4).
Expand All @@ -28,8 +30,9 @@ func main() {
SetShard(0).
SetOutputFilePath("data/output.txt").
SetStatusFilePath("data/output.status").
SetTotalTasks(total).
Start()
for range utils.Cat("data/input.txt") {
for range utils.Cat(inputFilePath) {
scheduler.Submit(New())
}
scheduler.Wait()
Expand Down
7 changes: 5 additions & 2 deletions examples/simple-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type MyTask struct {
StatusCode int `json:"status_code"`
}

func New(url string) *MyTask {
func NewTask(url string) *MyTask {
return &MyTask{
Url: url,
}
Expand All @@ -29,15 +29,18 @@ func (t *MyTask) Do() error {
}

func main() {
inputFilePath := "data/input.txt"
total := utils.Count(utils.Cat(inputFilePath))
scheduler := gojob.NewScheduler().
SetNumWorkers(8).
SetMaxRetries(4).
SetMaxRuntimePerTaskSeconds(16).
SetNumShards(4).
SetShard(0).
SetTotalTasks(total).
Start()
for line := range utils.Cat("data/input.txt") {
scheduler.Submit(New(line))
scheduler.Submit(NewTask(line))
}
scheduler.Wait()
}
4 changes: 3 additions & 1 deletion examples/sleeper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ func (t *MyTask) Do() error {
}

func main() {
total := 256
scheduler := gojob.NewScheduler().
SetNumWorkers(8).
SetMaxRetries(4).
SetMaxRuntimePerTaskSeconds(16).
SetNumShards(4).
SetShard(0).
SetOutputFilePath("output.txt").
SetTotalTasks(int64(total)).
Start()
for i := 0; i < 256; i++ {
for i := 0; i < total; i++ {
scheduler.Submit(New(i, rand.Intn(10)))
}
scheduler.Wait()
Expand Down
8 changes: 8 additions & 0 deletions pkg/utils/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ func Cat(filePath string) <-chan string {

return out
}

// Count takes a channel and returns the number of items
func Count[T any](in <-chan T) (count int64) {
for range in {
count++
}
return count
}
3 changes: 2 additions & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ func (s *Scheduler) SetMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) *S
}

// SetTotalTasks sets the total number of tasks
func (s *Scheduler) SetTotalTasks(numTotalTasks int64) {
func (s *Scheduler) SetTotalTasks(numTotalTasks int64) *Scheduler {
s.NumTotalTasks.Store(numTotalTasks)
return s
}

// AddMetadata adds metadata
Expand Down

0 comments on commit ac5fa2c

Please sign in to comment.