Skip to content

Commit

Permalink
perf: reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Jan 17, 2024
1 parent 843dca5 commit 9c7769f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 49 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ output.txt

# Goreleaser
dist/

# VSCode
.vscode/
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func (t *MyTask) Bytes() ([]byte, error) {
}

func (t *MyTask) NeedRetry() bool {
return t.NumTries <= MAX_TRIES
return t.NumTries < MAX_TRIES
}

func main() {
scheduler := gojob.NewScheduler(16, "output.txt")
scheduler := gojob.NewScheduler(1, "output.txt")
scheduler.Start()
for line := range gojob.Cat("input.txt") {
scheduler.Submit(New(line))
}
scheduler.Start()
scheduler.Wait()
}
```
Expand Down
5 changes: 3 additions & 2 deletions example/simple-http-crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ func (t *MyTask) NeedRetry() bool {
}

func main() {
scheduler := gojob.NewScheduler(16, "output.txt")
scheduler := gojob.NewScheduler(1, "output.txt")
scheduler.Start()
for line := range gojob.Cat("input.txt") {
scheduler.Submit(New(line))
}
scheduler.Start()
scheduler.Wait()
}
95 changes: 51 additions & 44 deletions gojob.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,72 +161,79 @@ type Task interface {

// Scheduler is a task scheduler
type Scheduler struct {
NumWorkers int
OutputFilePath string
TaskChan chan Task
TaskWaitGroup *sync.WaitGroup
WriterWaitGroup *sync.WaitGroup
NumWorkers int
OutputFilePath string
TaskChan chan Task
LogChan chan string
taskWg *sync.WaitGroup
logWg *sync.WaitGroup
}

// NewScheduler creates a new scheduler
func NewScheduler(numWorkers int, outputFilePath string) *Scheduler {
return &Scheduler{
NumWorkers: numWorkers,
TaskChan: make(chan Task, numWorkers),
OutputFilePath: outputFilePath,
TaskWaitGroup: &sync.WaitGroup{},
WriterWaitGroup: &sync.WaitGroup{},
NumWorkers: numWorkers,
OutputFilePath: outputFilePath,
TaskChan: make(chan Task, 1024),
LogChan: make(chan string, 1024),
taskWg: &sync.WaitGroup{},
logWg: &sync.WaitGroup{},
}
}

// Submit submits a task to the scheduler
func (s *Scheduler) Submit(task Task) {
s.TaskWaitGroup.Add(1)
go func() {
s.TaskChan <- task
}()
s.taskWg.Add(1)
s.TaskChan <- task
}

// Start starts the scheduler
func (s *Scheduler) Start() {
results := []chan string{}
for i := 0; i < s.NumWorkers; i++ {
results = append(results, s.Worker())
go s.Worker()
}
go s.Writer(Fanin(results), s.OutputFilePath)
s.TaskWaitGroup.Wait()
s.WriterWaitGroup.Wait()
go s.Writer()
}

func (s *Scheduler) Worker() chan string {
out := make(chan string)
go func() {
defer close(out)
for task := range s.TaskChan {
err := task.Do()
if err != nil && task.NeedRetry() {
s.Submit(task)
}
data, err := task.Bytes()
if err != nil {
slog.Error("error occured while serializing task", slog.String("error", err.Error()))
}
s.WriterWaitGroup.Add(1)
out <- string(data)
s.TaskWaitGroup.Done()
// Wait waits for all tasks to finish
func (s *Scheduler) Wait() {
s.taskWg.Wait()
close(s.TaskChan)
s.logWg.Wait()
close(s.LogChan)
}

// Worker is a worker
func (s *Scheduler) Worker() {
for task := range s.TaskChan {

// do task
err := task.Do()
// check if retry is needed
if err != nil && task.NeedRetry() {
go s.Submit(task)
}
}()
return out
// put log to log channel
data, err := task.Bytes()
if err != nil {
slog.Error("error occured while serializing task", slog.String("error", err.Error()))
}
s.logWg.Add(1)
s.LogChan <- string(data)
// notify task is done
s.taskWg.Done()
}
}

func (s *Scheduler) Writer(inputChan chan string, outputFilePath string) {
// Writer writes logs to file
func (s *Scheduler) Writer() {
var fd *os.File
var err error
if outputFilePath == "-" {
if s.OutputFilePath == "-" {
fd = os.Stdout
} else {
// Create folder if not exists
dir := filepath.Dir(outputFilePath)
dir := filepath.Dir(s.OutputFilePath)
if _, err := os.Stat(dir); os.IsNotExist(err) {
err = os.MkdirAll(dir, 0755)
if err != nil {
Expand All @@ -235,14 +242,14 @@ func (s *Scheduler) Writer(inputChan chan string, outputFilePath string) {
}
}
// Open file
fd, err = os.OpenFile(outputFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
fd, err = os.OpenFile(s.OutputFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
slog.Error("error occured while opening file", slog.String("path", outputFilePath), slog.String("error", err.Error()))
slog.Error("error occured while opening file", slog.String("path", s.OutputFilePath), slog.String("error", err.Error()))
return
}
}
for result := range inputChan {
for result := range s.LogChan {
fd.WriteString(result + "\n")
s.WriterWaitGroup.Done()
s.logWg.Done()
}
}

0 comments on commit 9c7769f

Please sign in to comment.