Skip to content

Commit

Permalink
fix: incomplete write
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Jan 16, 2024
1 parent 70b93fd commit 2cc758c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
1 change: 1 addition & 0 deletions example/complex-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ func main() {
scheduler.Submit(model.New(string(line)))
}
scheduler.Start()
scheduler.Wait()
}
2 changes: 1 addition & 1 deletion example/simple-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *MyTask) Bytes() ([]byte, error) {
}

func (t *MyTask) NeedRetry() bool {
return t.NumTries <= MAX_TRIES
return t.NumTries < MAX_TRIES
}

func main() {
Expand Down
35 changes: 19 additions & 16 deletions gojob.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,26 +161,27 @@ type Task interface {

// Scheduler is a task scheduler
type Scheduler struct {
NumWorkers int
OutputFilePath string
TaskChan chan Task
WaitGroup *sync.WaitGroup
WriterChan chan string
NumWorkers int
OutputFilePath string
TaskChan chan Task
TaskWaitGroup *sync.WaitGroup
WriterWaitGroup *sync.WaitGroup
}

// NewScheduler creates a new scheduler
func NewScheduler(numWorkers int, outputFilePath string) *Scheduler {
return &Scheduler{
NumWorkers: numWorkers,
TaskChan: make(chan Task, numWorkers),
OutputFilePath: outputFilePath,
WaitGroup: &sync.WaitGroup{},
NumWorkers: numWorkers,
TaskChan: make(chan Task, numWorkers),
OutputFilePath: outputFilePath,
TaskWaitGroup: &sync.WaitGroup{},
WriterWaitGroup: &sync.WaitGroup{},
}
}

// Submit submits a task to the scheduler
func (s *Scheduler) Submit(task Task) {
s.WaitGroup.Add(1)
s.TaskWaitGroup.Add(1)
go func() {
s.TaskChan <- task
}()
Expand All @@ -192,12 +193,12 @@ func (s *Scheduler) Start() {
for i := 0; i < s.NumWorkers; i++ {
results = append(results, s.Worker())
}
s.WriterChan = Fanin(results)
go s.Writer(s.OutputFilePath)
go s.Writer(Fanin(results), s.OutputFilePath)
}

func (s *Scheduler) Wait() {
s.WaitGroup.Wait()
s.TaskWaitGroup.Wait()
s.WriterWaitGroup.Wait()
}

func (s *Scheduler) Worker() chan string {
Expand All @@ -213,14 +214,15 @@ func (s *Scheduler) Worker() chan string {
if err != nil {
slog.Error("error occured while serializing task", slog.String("error", err.Error()))
}
s.WriterWaitGroup.Add(1)
out <- string(data)
s.WaitGroup.Done()
s.TaskWaitGroup.Done()
}
}()
return out
}

func (s *Scheduler) Writer(outputFilePath string) {
func (s *Scheduler) Writer(inputChan chan string, outputFilePath string) {
var fd *os.File
var err error
if outputFilePath == "-" {
Expand All @@ -242,7 +244,8 @@ func (s *Scheduler) Writer(outputFilePath string) {
return
}
}
for result := range s.WriterChan {
for result := range inputChan {
fd.WriteString(result + "\n")
s.WriterWaitGroup.Done()
}
}

0 comments on commit 2cc758c

Please sign in to comment.