Skip to content

Commit

Permalink
feat: discard disconnected game server streams (#3328)
Browse files Browse the repository at this point in the history
* feat: discard disconnected game server streams

* fix: lint
  • Loading branch information
antiphp authored Aug 17, 2023
1 parent e24353a commit a2b9806
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 12 deletions.
6 changes: 4 additions & 2 deletions pkg/sdkserver/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ func (m *emptyMockStream) RecvMsg(msg interface{}) error {
}

type gameServerMockStream struct {
ctx context.Context
msgs chan *sdk.GameServer
}

// newGameServerMockStream implements SDK_WatchGameServerServer for testing
func newGameServerMockStream() *gameServerMockStream {
return &gameServerMockStream{
ctx: context.Background(),
msgs: make(chan *sdk.GameServer, 10),
}
}
Expand All @@ -125,8 +127,8 @@ func (*gameServerMockStream) SetTrailer(metadata.MD) {
panic("implement me")
}

func (*gameServerMockStream) Context() netcontext.Context {
panic("implement me")
func (m *gameServerMockStream) Context() netcontext.Context {
return m.ctx
}

func (*gameServerMockStream) SendMsg(m interface{}) error {
Expand Down
29 changes: 20 additions & 9 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,15 +826,26 @@ func (s *SDKServer) RemoveListValue(ctx context.Context, in *alpha.RemoveListVal
func (s *SDKServer) sendGameServerUpdate(gs *agonesv1.GameServer) {
s.logger.Debug("Sending GameServer Event to connectedStreams")

s.streamMutex.RLock()
defer s.streamMutex.RUnlock()

for _, stream := range s.connectedStreams {
err := stream.Send(convert(gs))
// We essentially ignoring any disconnected streams.
// I think this is fine, as disconnections shouldn't actually happen.
// but we should log them, just in case they do happen, and we can track it
if err != nil {
s.streamMutex.Lock()
defer s.streamMutex.Unlock()

for i, stream := range s.connectedStreams {
select {
case <-stream.Context().Done():
s.connectedStreams = append(s.connectedStreams[:i], s.connectedStreams[i+1:]...)

err := stream.Context().Err()
switch {
case err != nil:
s.logger.WithError(errors.WithStack(err)).Error("stream closed with error")
default:
s.logger.Debug("stream closed")
}
continue
default:
}

if err := stream.Send(convert(gs)); err != nil {
s.logger.WithError(errors.WithStack(err)).
Error("error sending game server update event")
}
Expand Down
61 changes: 60 additions & 1 deletion pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,65 @@ func TestSDKServerSendGameServerUpdate(t *testing.T) {
assert.Equal(t, fixture.ObjectMeta.Name, sdkGS.ObjectMeta.Name)
}

func TestSDKServer_SendGameServerUpdateRemovesDisconnectedStream(t *testing.T) {
t.Parallel()

fixture := &agonesv1.GameServer{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Status: agonesv1.GameServerStatus{
State: agonesv1.GameServerStateReady,
},
}

m := agtesting.NewMocks()
fakeWatch := watch.NewFake()
m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(fakeWatch, nil))
sc, err := defaultSidecar(m)
require.NoError(t, err)
assert.Empty(t, sc.connectedStreams)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
sc.ctx = ctx
sc.informerFactory.Start(ctx.Done())

fakeWatch.Add(fixture.DeepCopy())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))
sc.gsWaitForSync.Done()

// Wait for the GameServer to be populated, as we can't rely on WaitForCacheSync.
require.Eventually(t, func() bool {
_, err := sc.gameServer()
return err == nil
}, time.Minute, time.Second, "Could not find the GameServer")

streamCtx, streamCancel := context.WithCancel(context.Background())
t.Cleanup(streamCancel)

// Trigger stream removal by sending an update on a cancelled stream.

stream := newGameServerMockStream()
stream.ctx = streamCtx

asyncWatchGameServer(t, sc, stream)
assert.Nil(t, waitConnectedStreamCount(sc, 1))

<-stream.msgs // Initial msg when WatchGameServer() is called.

streamCancel()

sc.sendGameServerUpdate(fixture)

select {
case <-stream.msgs:
assert.Fail(t, "Event stream should have been removed.")
case <-time.After(1 * time.Second):
}
}

func TestSDKServerUpdateEventHandler(t *testing.T) {
t.Parallel()
fixture := &agonesv1.GameServer{
Expand Down Expand Up @@ -1545,7 +1604,7 @@ func waitForMessage(sc *SDKServer) error {
})
}

func waitConnectedStreamCount(sc *SDKServer, count int) error {
func waitConnectedStreamCount(sc *SDKServer, count int) error { //nolint:unparam // Keep flexibility.
return wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
sc.streamMutex.RLock()
defer sc.streamMutex.RUnlock()
Expand Down

0 comments on commit a2b9806

Please sign in to comment.