Skip to content

Commit

Permalink
Add rate limiting for async delivery by using a goroutine with consum…
Browse files Browse the repository at this point in the history
…er channel
  • Loading branch information
DariaKunoichi committed Aug 2, 2024
1 parent 245bce0 commit 1bf0498
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 36 deletions.
4 changes: 3 additions & 1 deletion features/fixtures/app/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ARG GO_VERSION
FROM golang:${GO_VERSION}-alpine

RUN apk update && apk upgrade && apk add git bash
RUN apk update && apk upgrade && apk add git bash build-base

ENV GOPATH /app

Expand All @@ -28,6 +28,8 @@ WORKDIR /app/src/test
# Skip on old versions of Go which pre-date modules
RUN if [[ $GO_VERSION != '1.11' && $GO_VERSION != '1.12' ]]; then \
go mod init && go mod tidy; \
echo "replace github.com/bugsnag/bugsnag-go/v2 => /app/src/github.com/bugsnag/bugsnag-go/v2" >> go.mod; \
go mod tidy; \
fi

RUN chmod +x run.sh
Expand Down
15 changes: 9 additions & 6 deletions features/fixtures/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
bugsnag "github.com/bugsnag/bugsnag-go/v2"
)

func configureBasicBugsnag(testcase string) {
func configureBasicBugsnag(testcase string, ctx context.Context) {
config := bugsnag.Configuration{
APIKey: os.Getenv("API_KEY"),
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
APIKey: os.Getenv("API_KEY"),
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
MainContext: ctx,
}

if notifyReleaseStages := os.Getenv("NOTIFY_RELEASE_STAGES"); notifyReleaseStages != "" {
Expand Down Expand Up @@ -63,11 +64,13 @@ func configureBasicBugsnag(testcase string) {
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

test := flag.String("test", "handled", "what the app should send, either handled, unhandled, session, autonotify")
flag.Parse()

configureBasicBugsnag(*test)
configureBasicBugsnag(*test, ctx)
time.Sleep(100 * time.Millisecond) // Ensure tests are less flaky by ensuring the start-up session gets sent

switch *test {
Expand Down
4 changes: 3 additions & 1 deletion features/fixtures/autoconfigure/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ARG GO_VERSION
FROM golang:${GO_VERSION}-alpine

RUN apk update && apk upgrade && apk add git bash
RUN apk update && apk upgrade && apk add git bash build-base

ENV GOPATH /app

Expand All @@ -28,6 +28,8 @@ WORKDIR /app/src/test
# Skip on old versions of Go which pre-date modules
RUN if [[ $GO_VERSION != '1.11' && $GO_VERSION != '1.12' ]]; then \
go mod init && go mod tidy; \
echo "replace github.com/bugsnag/bugsnag-go/v2 => /app/src/github.com/bugsnag/bugsnag-go/v2" >> go.mod; \
go mod tidy; \
fi

RUN chmod +x run.sh
Expand Down
4 changes: 3 additions & 1 deletion features/fixtures/net_http/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM golang:${GO_VERSION}-alpine

RUN apk update && \
apk upgrade && \
apk add git
apk add git build-base

ENV GOPATH /app

Expand All @@ -30,4 +30,6 @@ WORKDIR /app/src/test
# Skip on old versions of Go which pre-date modules
RUN if [[ $GO_VERSION != '1.11' && $GO_VERSION != '1.12' ]]; then \
go mod init && go mod tidy; \
echo "replace github.com/bugsnag/bugsnag-go/v2 => /app/src/github.com/bugsnag/bugsnag-go/v2" >> go.mod; \
go mod tidy; \
fi
13 changes: 8 additions & 5 deletions features/fixtures/net_http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
)

func main() {
configureBasicBugsnag()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
configureBasicBugsnag(ctx)

http.HandleFunc("/handled", handledError)
http.HandleFunc("/autonotify-then-recover", unhandledCrash)
Expand All @@ -40,16 +42,17 @@ func recoverWrap(h http.Handler) http.Handler {
})
}

func configureBasicBugsnag() {
func configureBasicBugsnag(ctx context.Context) {
config := bugsnag.Configuration{
APIKey: os.Getenv("API_KEY"),
Endpoints: bugsnag.Endpoints{
Notify: os.Getenv("BUGSNAG_ENDPOINT"),
Sessions: os.Getenv("BUGSNAG_ENDPOINT"),
},
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
MainContext: ctx,
}

if notifyReleaseStages := os.Getenv("NOTIFY_RELEASE_STAGES"); notifyReleaseStages != "" {
Expand Down
10 changes: 5 additions & 5 deletions features/handled.feature
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Scenario: Sending an event using a callback to modify report contents
And the event "severityReason.type" equals "userCallbackSetSeverity"
And the event "context" equals "nonfatal.go:14"
And the "file" of stack frame 0 equals "main.go"
And stack frame 0 contains a local function spanning 242 to 248
And stack frame 0 contains a local function spanning 245 to 253
And the "file" of stack frame 1 equals ">insertion<"
And the "lineNumber" of stack frame 1 equals 0

Expand All @@ -50,7 +50,7 @@ Scenario: Marking an error as unhandled in a callback
And the event "severityReason.type" equals "userCallbackSetSeverity"
And the event "severityReason.unhandledOverridden" is true
And the "file" of stack frame 0 equals "main.go"
And stack frame 0 contains a local function spanning 254 to 257
And stack frame 0 contains a local function spanning 257 to 262

Scenario: Unwrapping the causes of a handled error
When I run the go service "app" with the test case "nested-error"
Expand All @@ -59,12 +59,12 @@ Scenario: Unwrapping the causes of a handled error
And the event "unhandled" is false
And the event "severity" equals "warning"
And the event "exceptions.0.message" equals "terminate process"
And the "lineNumber" of stack frame 0 equals 292
And the "lineNumber" of stack frame 0 equals 295
And the "file" of stack frame 0 equals "main.go"
And the "method" of stack frame 0 equals "nestedHandledError"
And the event "exceptions.1.message" equals "login failed"
And the event "exceptions.1.stacktrace.0.file" equals "main.go"
And the event "exceptions.1.stacktrace.0.lineNumber" equals 312
And the event "exceptions.1.stacktrace.0.lineNumber" equals 315
And the event "exceptions.2.message" equals "invalid token"
And the event "exceptions.2.stacktrace.0.file" equals "main.go"
And the event "exceptions.2.stacktrace.0.lineNumber" equals 320
And the event "exceptions.2.stacktrace.0.lineNumber" equals 323
14 changes: 11 additions & 3 deletions v2/bugsnag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/bugsnag/bugsnag-go/v2/sessions"
)

// ATTENTION - tests in this file are changing global state variables
// like default config or default report publisher
// TAKE CARE to reset them to default after testcase!

// The line numbers of this method are used in tests.
// If you move this function you'll have to change tests
func crashyHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -132,7 +136,7 @@ func TestNotify(t *testing.T) {
}

exception := getIndex(event, "exceptions", 0)
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "TestNotify", LineNumber: 98, InProject: true})
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "TestNotify", LineNumber: 102, InProject: true})
}

type testPublisher struct {
Expand All @@ -144,6 +148,9 @@ func (tp *testPublisher) publishReport(p *payload) error {
return nil
}

func (tp *testPublisher) setMainProgramContext(context.Context) {
}

func TestNotifySyncThenAsync(t *testing.T) {
ts, _ := setup()
defer ts.Close()
Expand All @@ -152,7 +159,7 @@ func TestNotifySyncThenAsync(t *testing.T) {

pub := new(testPublisher)
publisher = pub
defer func() { publisher = new(defaultReportPublisher) }()
defer func() { publisher = newPublisher() }()

Notify(fmt.Errorf("oopsie"))
if pub.sync {
Expand All @@ -175,6 +182,7 @@ func TestHandlerFunc(t *testing.T) {
defer eventserver.Close()
Configure(generateSampleConfig(eventserver.URL))

// NOTE - this testcase will print a panic in verbose mode
t.Run("unhandled", func(st *testing.T) {
sessionTracker = nil
startSessionTracking()
Expand Down Expand Up @@ -315,7 +323,7 @@ func TestHandler(t *testing.T) {
}

exception := getIndex(event, "exceptions", 0)
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "crashyHandler", InProject: true, LineNumber: 24})
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "crashyHandler", InProject: true, LineNumber: 28})
}

func TestAutoNotify(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions v2/configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bugsnag

import (
"context"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -101,6 +102,12 @@ type Configuration struct {
// Whether bugsnag should notify synchronously. This defaults to false which
// causes bugsnag-go to spawn a new goroutine for each notification.
Synchronous bool

// Context created in the main program
// Used in event delivery - after this context is marked Done
// the event sending goroutine will switch to a graceful shutdown
// and will try to send any remaining events.
MainContext context.Context
// Whether the notifier should send all sessions recorded so far to Bugsnag
// when repanicking to ensure that no session information is lost in a
// fatal crash.
Expand Down Expand Up @@ -160,6 +167,10 @@ func (config *Configuration) update(other *Configuration) *Configuration {
if other.Synchronous {
config.Synchronous = true
}
if other.MainContext != nil {
config.MainContext = other.MainContext
publisher.setMainProgramContext(other.MainContext)
}

if other.AutoCaptureSessions != nil {
config.AutoCaptureSessions = other.AutoCaptureSessions
Expand Down
11 changes: 6 additions & 5 deletions v2/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/bugsnag/bugsnag-go/v2/errors"
)

var publisher reportPublisher = new(defaultReportPublisher)
var publisher reportPublisher = newPublisher()

// Notifier sends errors to Bugsnag.
type Notifier struct {
Expand Down Expand Up @@ -84,10 +84,11 @@ func (notifier *Notifier) NotifySync(err error, sync bool, rawData ...interface{
// AutoNotify notifies Bugsnag of any panics, then repanics.
// It sends along any rawData that gets passed in.
// Usage:
// go func() {
// defer AutoNotify()
// // (possibly crashy code)
// }()
//
// go func() {
// defer AutoNotify()
// // (possibly crashy code)
// }()
func (notifier *Notifier) AutoNotify(rawData ...interface{}) {
if err := recover(); err != nil {
severity := notifier.getDefaultSeverity(rawData, SeverityError)
Expand Down
82 changes: 73 additions & 9 deletions v2/report_publisher.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,74 @@
package bugsnag

import "fmt"
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)

type reportPublisher interface {
publishReport(*payload) error
setMainProgramContext(context.Context)
}

type defaultReportPublisher struct{}
func (defPub *defaultReportPublisher) delivery() {
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, syscall.SIGINT, syscall.SIGTERM)

func (*defaultReportPublisher) publishReport(p *payload) error {
waitForEnd:
for {
select {
case <-signalsCh:
defPub.isClosing = true
break waitForEnd
case <-defPub.mainProgramCtx.Done():
defPub.isClosing = true
break waitForEnd
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
}
}

// Send remaining elements from the queue
close(defPub.eventsChan)
for p := range defPub.eventsChan {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
}
}

type defaultReportPublisher struct {
eventsChan chan *payload
mainProgramCtx context.Context
isClosing bool
}

func newPublisher() reportPublisher {
defPub := defaultReportPublisher{isClosing: false, mainProgramCtx: context.TODO()}
defPub.eventsChan = make(chan *payload, 100)

go defPub.delivery()

return &defPub
}

func (defPub *defaultReportPublisher) setMainProgramContext(ctx context.Context) {
defPub.mainProgramCtx = ctx
}

func (defPub *defaultReportPublisher) publishReport(p *payload) error {
p.logf("notifying bugsnag: %s", p.Message)
if !p.notifyInReleaseStage() {
return fmt.Errorf("not notifying in %s", p.ReleaseStage)
Expand All @@ -17,11 +77,15 @@ func (*defaultReportPublisher) publishReport(p *payload) error {
return p.deliver()
}

go func(p *payload) {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
}(p)
if defPub.isClosing {
return fmt.Errorf("main program is stopping, new events won't be sent")
}

select {
case defPub.eventsChan <- p:
default:
p.logf("Events channel full. Discarding value")
}

return nil
}

0 comments on commit 1bf0498

Please sign in to comment.