Skip to content

Commit

Permalink
Deliver async reports in a goroutine pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Angelo Marletta committed May 9, 2024
1 parent 4ae0bc0 commit 4eb61dc
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 9 deletions.
74 changes: 74 additions & 0 deletions v2/async_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package bugsnag

import (
"context"
"errors"
"fmt"
"sync"
)

type AsyncRunner[T any] struct {
ch chan T
wg sync.WaitGroup
processFn ProcessFunc[T]
}

type ProcessFunc[T any] func(T)

var ErrChannelFull = errors.New("channel is full")

func NewAsyncRunner[T any](
ctx context.Context,
channelSize int,
workerCount int,
processFn ProcessFunc[T],
) (*AsyncRunner[T], error) {
if channelSize < 0 {
return nil, fmt.Errorf("channelSize must be non negative")
}

b := &AsyncRunner[T]{
ch: make(chan T, channelSize),
wg: sync.WaitGroup{},
processFn: processFn,
}

b.startWorkers(ctx, workerCount)
return b, nil
}

func (b *AsyncRunner[T]) Submit(v T) error {
select {
case b.ch <- v:
// success
return nil
default:
// channel is full
return ErrChannelFull
}
}

func (b *AsyncRunner[T]) Wait() {
b.wg.Wait()
}

func (b *AsyncRunner[T]) startWorkers(ctx context.Context, workerCount int) {
b.wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func() {
defer b.wg.Done()
b.workerLoop(ctx)
}()
}
}

func (b *AsyncRunner[T]) workerLoop(ctx context.Context) {
for {
select {
case msg := <-b.ch:
b.processFn(msg)
case <-ctx.Done():
return
}
}
}
69 changes: 69 additions & 0 deletions v2/async_runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package bugsnag

import (
"context"
"sync/atomic"
"testing"
"time"
)

func TestAsyncRunner(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
channelSize := 10
workerCount := 5
var i atomic.Int32
processFn := func(val int) {
i.Add(1)
}

runner, err := NewAsyncRunner(ctx, channelSize, workerCount, processFn)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

if cap(runner.ch) != channelSize {
t.Errorf("Expected channel size of %v, but got %v", channelSize, cap(runner.ch))
}

if runner.processFn == nil {
t.Error("Expected processFn to be set, but it was nil")
}

runner.Submit(1)
runner.Submit(2)
runner.Submit(3)
time.Sleep(100 * time.Millisecond)
cancel()
runner.Wait()

if i.Load() != 3 {
t.Errorf("Expected i to be 3, but got %v", i.Load())
}
}

func TestAsyncRunnerChannelFull(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
channelSize := 10
workerCount := 5
processFn := func(_ int) {
time.Sleep(100 * time.Millisecond)
}

runner, err := NewAsyncRunner(ctx, channelSize, workerCount, processFn)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

for i := 0; i < channelSize; i++ {
if err := runner.Submit(i); err != nil {
t.Errorf("Unexpected error: %v", err)
}
}

if err := runner.Submit(10); err != ErrChannelFull {
t.Errorf("Expected channel full error, but got %v", err)
}

cancel()
runner.Wait()
}
22 changes: 22 additions & 0 deletions v2/bugsnag.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var DefaultSessionPublishInterval = 60 * time.Second
var defaultNotifier = Notifier{&Config, nil}
var sessionTracker sessions.SessionTracker

var asyncRunner *AsyncRunner[*payload]

// Configure Bugsnag. The only required setting is the APIKey, which can be
// obtained by clicking on "Settings" in your Bugsnag dashboard. This function
// is also responsible for installing the global panic handler, so it should be
Expand All @@ -51,6 +53,7 @@ func Configure(config Configuration) {
// Only do once in case the user overrides the default panichandler, and
// configures multiple times.
panicHandlerOnce.Do(Config.PanicHandler)
setupAsyncRunner()
}

// StartSession creates new context from the context.Context instance with
Expand Down Expand Up @@ -253,6 +256,8 @@ func init() {
Logger: log.New(os.Stdout, log.Prefix(), log.Flags()),
PanicHandler: defaultPanicHandler,
Transport: http.DefaultTransport,
NumGoroutines: 10,
MaxPendingReports: 1000,

flushSessionsOnRepanic: true,
})
Expand Down Expand Up @@ -282,3 +287,20 @@ func updateSessionConfig() {
Logger: Config.Logger,
})
}

func setupAsyncRunner() {
var err error
asyncRunner, err = NewAsyncRunner[*payload](
context.Background(),
Config.MaxPendingReports,
Config.NumGoroutines,
func(p *payload) {
if err := publisher.publishReport(p); err != nil {
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
},
)
if err != nil {
Config.Logger.Printf("ERROR: %v", err)
}
}
23 changes: 23 additions & 0 deletions v2/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
)

Expand Down Expand Up @@ -98,9 +99,19 @@ type Configuration struct {
// can be configured if you are in an environment
// that has stringent conditions on making http requests.
Transport http.RoundTripper

// Whether bugsnag should notify synchronously. This defaults to false which
// causes bugsnag-go to spawn a new goroutine for each notification.
Synchronous bool

// Number of goroutines to use for sending notifications in asynchronous
// mode. This defaults to 10.
NumGoroutines int

// The maximum number of reports that can be pending in asynchronous mode.
// This defaults to 1000.
MaxPendingReports int

// Whether the notifier should send all sessions recorded so far to Bugsnag
// when repanicking to ensure that no session information is lost in a
// fatal crash.
Expand Down Expand Up @@ -307,6 +318,18 @@ func (config *Configuration) loadEnv() {
if synchronous := os.Getenv("BUGSNAG_SYNCHRONOUS"); synchronous != "" {
envConfig.Synchronous = synchronous == "1"
}
if numGoroutines := os.Getenv("BUGSNAG_NUM_GOROUTINES"); numGoroutines != "" {
num, err := strconv.Atoi(numGoroutines)
if err != nil {
envConfig.NumGoroutines = num
}
}
if maxPendingReports := os.Getenv("BUGSNAG_MAX_PENDING_REPORTS"); maxPendingReports != "" {
max, err := strconv.Atoi(maxPendingReports)
if err != nil {
envConfig.MaxPendingReports = max
}
}
if disablePanics := os.Getenv("BUGSNAG_DISABLE_PANIC_HANDLER"); disablePanics == "1" {
envConfig.PanicHandler = func() {}
}
Expand Down
5 changes: 3 additions & 2 deletions v2/go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module github.com/bugsnag/bugsnag-go/v2

go 1.15
go 1.18

require (
github.com/bitly/go-simplejson v0.5.1
github.com/bugsnag/panicwrap v1.3.4
github.com/google/uuid v1.6.0
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/pkg/errors v0.9.1
)

require github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
8 changes: 1 addition & 7 deletions v2/report_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,5 @@ func (*defaultReportPublisher) publishReport(p *payload) error {
return p.deliver()
}

go func(p *payload) {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
}(p)
return nil
return asyncRunner.Submit(p)
}

0 comments on commit 4eb61dc

Please sign in to comment.