Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffered chan event system #3968

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,16 @@ func (s *System) Emit(event *Event) (wait func(context.Context) error) {
doneCh := make(chan struct{}, s.eventBuffer)
doneFn := func() {
origDoneFn()
select {
case doneCh <- struct{}{}:
default:
}
// The done must be read by the reading side to prevent
// a goroutine that waits indefinitely.
doneCh <- struct{}{}
}
event.Done = doneFn

for _, evtCh := range s.subscribers[event.Type] {
select {
case evtCh <- event:
default:
}
// The event channel must read off the channel otherwise we would
// be dropping events.
evtCh <- event
}

s.logger.WithFields(logrus.Fields{
Expand Down
40 changes: 40 additions & 0 deletions event/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,46 @@ func TestEventSystem(t *testing.T) {
wg.Wait()
})

// This ensures that the system still works even when the buffer size of
// the event system is smaller than the numSubs. We had an issue where
// when all the sub were trying to call done it would fail since the buffer
// was full and the event would never fully complete and wait indefinitely.
t.Run("emit_and_wait/buffer", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
logger := logrus.New()
logger.SetOutput(io.Discard)
// Not buffered
es := NewEventSystem(0, logger)

var (
wg sync.WaitGroup
numSubs = 100
)
for i := 0; i < numSubs; i++ {
sid, evtCh := es.Subscribe(Exit)
wg.Add(1)
go func() {
defer wg.Done()
_, err := processEvents(ctx, es, sid, evtCh)
require.NoError(t, err)
}()
}

var done uint32
wait := es.Emit(&Event{Type: Exit, Done: func() {
atomic.AddUint32(&done, 1)
}})
waitCtx, waitCancel := context.WithTimeout(ctx, time.Second)
defer waitCancel()
err := wait(waitCtx)
require.NoError(t, err)
assert.Equal(t, uint32(numSubs), done)

wg.Wait()
})

t.Run("emit_and_wait/error", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down