-
Notifications
You must be signed in to change notification settings - Fork 69
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Deliver async reports from a goroutine pool
- Loading branch information
Angelo Marletta
committed
May 9, 2024
1 parent
4ae0bc0
commit 95334ce
Showing
6 changed files
with
192 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package bugsnag | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
) | ||
|
||
type AsyncRunner[T any] struct { | ||
ch chan T | ||
wg sync.WaitGroup | ||
processFn ProcessFunc[T] | ||
} | ||
|
||
type ProcessFunc[T any] func(T) | ||
|
||
var ErrChannelFull = errors.New("channel is full") | ||
|
||
func NewAsyncRunner[T any]( | ||
ctx context.Context, | ||
channelSize int, | ||
workerCount int, | ||
processFn ProcessFunc[T], | ||
) (*AsyncRunner[T], error) { | ||
if channelSize < 0 { | ||
return nil, fmt.Errorf("channelSize must be non negative") | ||
} | ||
|
||
b := &AsyncRunner[T]{ | ||
ch: make(chan T, channelSize), | ||
wg: sync.WaitGroup{}, | ||
processFn: processFn, | ||
} | ||
|
||
b.startWorkers(ctx, workerCount) | ||
return b, nil | ||
} | ||
|
||
func (b *AsyncRunner[T]) Submit(v T) error { | ||
select { | ||
case b.ch <- v: | ||
// success | ||
return nil | ||
default: | ||
// channel is full | ||
return ErrChannelFull | ||
} | ||
} | ||
|
||
func (b *AsyncRunner[T]) Wait() { | ||
b.wg.Wait() | ||
} | ||
|
||
func (b *AsyncRunner[T]) startWorkers(ctx context.Context, workerCount int) { | ||
b.wg.Add(workerCount) | ||
for i := 0; i < workerCount; i++ { | ||
go func() { | ||
defer b.wg.Done() | ||
b.workerLoop(ctx) | ||
}() | ||
} | ||
} | ||
|
||
func (b *AsyncRunner[T]) workerLoop(ctx context.Context) { | ||
for { | ||
select { | ||
case msg := <-b.ch: | ||
b.processFn(msg) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package bugsnag | ||
|
||
import ( | ||
"context" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestAsyncRunner(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
channelSize := 10 | ||
workerCount := 5 | ||
var i atomic.Int32 | ||
processFn := func(val int) { | ||
i.Add(1) | ||
} | ||
|
||
runner, err := NewAsyncRunner(ctx, channelSize, workerCount, processFn) | ||
if err != nil { | ||
t.Errorf("Unexpected error: %v", err) | ||
} | ||
|
||
if cap(runner.ch) != channelSize { | ||
t.Errorf("Expected channel size of %v, but got %v", channelSize, cap(runner.ch)) | ||
} | ||
|
||
if runner.processFn == nil { | ||
t.Error("Expected processFn to be set, but it was nil") | ||
} | ||
|
||
runner.Submit(1) | ||
runner.Submit(2) | ||
runner.Submit(3) | ||
time.Sleep(100 * time.Millisecond) | ||
cancel() | ||
runner.Wait() | ||
|
||
if i.Load() != 3 { | ||
t.Errorf("Expected i to be 3, but got %v", i.Load()) | ||
} | ||
} | ||
|
||
func TestAsyncRunnerChannelFull(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
channelSize := 10 | ||
workerCount := 5 | ||
processFn := func(_ int) { | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
|
||
runner, err := NewAsyncRunner(ctx, channelSize, workerCount, processFn) | ||
if err != nil { | ||
t.Errorf("Unexpected error: %v", err) | ||
} | ||
|
||
for i := 0; i < channelSize; i++ { | ||
if err := runner.Submit(i); err != nil { | ||
t.Errorf("Unexpected error: %v", err) | ||
} | ||
} | ||
|
||
if err := runner.Submit(10); err != ErrChannelFull { | ||
t.Errorf("Expected channel full error, but got %v", err) | ||
} | ||
|
||
cancel() | ||
runner.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,12 @@ | ||
module github.com/bugsnag/bugsnag-go/v2 | ||
|
||
go 1.15 | ||
go 1.18 | ||
|
||
require ( | ||
github.com/bitly/go-simplejson v0.5.1 | ||
github.com/bugsnag/panicwrap v1.3.4 | ||
github.com/google/uuid v1.6.0 | ||
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect | ||
github.com/pkg/errors v0.9.1 | ||
) | ||
|
||
require github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters