Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: Improved sequencing documentation for server-side stats events and added tests. #7885

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ type RPCStats interface {
IsClient() bool
}

// Begin contains stats when an RPC attempt begins.
// Begin contains stats when an RPC attempt begins. This event is called after
// the InHeader event, as headers must be processed before the RPC lifecycle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit InHeader

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What specifically is the issue here? Is it related to style?

// begins.
//
// FailFast is only valid if this Begin is from client side.
type Begin struct {
// Client is true if this Begin is from client side.
Expand Down Expand Up @@ -98,7 +101,9 @@ func (s *InPayload) IsClient() bool { return s.Client }

func (s *InPayload) isRPCStats() {}

// InHeader contains stats when a header is received.
// InHeader contain stats when the header is received. It is the first event in
// the server after receiving the RPC. It is followed by the OutPayload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutPayload

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what you are asking for here?

// server event.
type InHeader struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RyanBlaney we should just edit the documentation in current struct instead of deleting it entirely and re-creating. Also, restrict the documentation to struct instead of writing the whole sequence. Mentioning that it is the first event on server side after rpc is received is fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll get right to it! Thank you!

// Client is true if this InHeader is from client side.
Client bool
Expand Down
153 changes: 153 additions & 0 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,50 @@ var (
}
// The id for which the service handler should return error.
errorID int32 = 32202
// To verify if the Unary RPC server stats events are logged in the
// correct order.
expectedUnarySequence = []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be initialized within the test itself because we are not using it anywhere else. Also, convention is to use want/wanted instead of expected

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! I will keep this in mind going forward. Thank you!

"ConnStats",
"InHeader",
"Begin",
"InPayload",
"OutHeader",
"OutPayload",
"OutTrailer",
"End",
}
// To verify if the Client Stream RPC server stats events are logged in the
// correct order.
expectedClientStreamSequence = []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. can be moved to the respective test. Also, wantClientStreamSequence

"ConnStats",
"InHeader",
"Begin",
"OutHeader",
"InPayload",
"InPayload",
"InPayload",
"InPayload",
"InPayload",
"OutPayload",
"OutTrailer",
"End",
}
// To verify if the Server Stream RPC server stats events are logged in the
// correct order.
expectedServerStreamSequence = []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. can be moved to respective test. s/expectedServerStreamSequence/wantServerStreamSequence

"ConnStats",
"InHeader",
"Begin",
"InPayload",
"OutHeader",
"OutPayload",
"OutPayload",
"OutPayload",
"OutPayload",
"OutPayload",
"OutTrailer",
"End",
}
)

func idToPayload(id int32) *testpb.Payload {
Expand Down Expand Up @@ -242,6 +286,8 @@ func newTest(t *testing.T, tc *testConfig, chs []stats.Handler, shs []stats.Hand

// startServer starts a gRPC server listening. Callers should defer a
// call to te.tearDown to clean up.
//
// Uses deprecated opts rpc.(RPCCompressor, RPCDecompressor, WithBlock, Dial)
func (te *test) startServer(ts testgrpc.TestServiceServer) {
te.testServer = ts
lis, err := net.Listen("tcp", "localhost:0")
Expand Down Expand Up @@ -786,8 +832,13 @@ func checkConnEnd(t *testing.T, d *gotData) {
st.IsClient() // TODO remove this.
}

type event struct {
eventType string
}

type statshandler struct {
mu sync.Mutex
events []event
gotRPC []*gotData
gotConn []*gotData
}
Expand All @@ -800,13 +851,41 @@ func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) conte
return context.WithValue(ctx, rpcCtxKey{}, info)
}

// recordEvent records an event in the statshandler along with a timestamp.
func (h *statshandler) recordEvent(eventType string) {
h.mu.Lock()
defer h.mu.Unlock()
h.events = append(h.events, event{eventType: eventType})
}

func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
h.recordEvent("ConnStats")

h.mu.Lock()
defer h.mu.Unlock()
h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s})
}

func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
switch s.(type) {
case *stats.Begin:
h.recordEvent("Begin")
case *stats.InHeader:
h.recordEvent("InHeader")
case *stats.InPayload:
h.recordEvent("InPayload")
case *stats.OutHeader:
h.recordEvent("OutHeader")
case *stats.OutPayload:
h.recordEvent("OutPayload")
case *stats.InTrailer:
h.recordEvent("InTrailer")
case *stats.OutTrailer:
h.recordEvent("OutTrailer")
case *stats.End:
h.recordEvent("End")
}

h.mu.Lock()
defer h.mu.Unlock()
h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s})
Expand Down Expand Up @@ -1519,3 +1598,77 @@ func (s) TestStatsHandlerCallsServerIsRegisteredMethod(t *testing.T) {
}
wg.Wait()
}

// TestServerStatsUnaryRPCEventSequence tests that the sequence of server-side stats
// events for a Unary RPC matches the expected flow.
func (s) TestServerStatsUnaryRPCEventSequence(t *testing.T) {
h := &statshandler{}
te := newTest(t, &testConfig{compress: ""}, nil, []stats.Handler{h})
te.startServer(&testServer{})
defer te.tearDown()

_, _, err := te.doUnaryCall(&rpcConfig{success: true, callType: unaryRPC})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Allow time for events to propagate
time.Sleep(50 * time.Millisecond)
// Verify sequence
h.mu.Lock()
defer h.mu.Unlock()
verifyEventSequence(t, h.events, expectedUnarySequence)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. cmp.Equal

}

// TestServerStatsClientStreamEventSequence tests that the sequence of server-side
// stats events for a Client Stream RPC matches the expected flow.
func (s) TestServerStatsClientStreamEventSequence(t *testing.T) {
h := &statshandler{}
te := newTest(t, &testConfig{compress: "gzip"}, nil, []stats.Handler{h})
te.startServer(&testServer{})
defer te.tearDown()

_, _, err := te.doClientStreamCall(&rpcConfig{count: 5, success: true, callType: clientStreamRPC})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

time.Sleep(50 * time.Millisecond)

h.mu.Lock()
defer h.mu.Unlock()
verifyEventSequence(t, h.events, expectedClientStreamSequence)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. Prefer cmp.Equal

}

// TestServerStatsClientStreamEventSequence tests that the sequence of server-side
// stats events for a Client Stream RPC matches the expected flow.
func (s) TestServerStatsServerStreamEventSequence(t *testing.T) {
h := &statshandler{}
te := newTest(t, &testConfig{compress: "gzip"}, nil, []stats.Handler{h})
te.startServer(&testServer{})
defer te.tearDown()

_, _, err := te.doServerStreamCall(&rpcConfig{count: 5, success: true, callType: serverStreamRPC})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

time.Sleep(50 * time.Millisecond)

h.mu.Lock()
defer h.mu.Unlock()
verifyEventSequence(t, h.events, expectedServerStreamSequence)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use cmp.Equal? Here we do care about order being same as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced with cmp.Equal although I used it in the helper function for reusability.

}

// verifyEventSequence verifies that a sequence of recorded events matches
// the expected sequence.
func verifyEventSequence(t *testing.T, got []event, expected []string) {
if len(got) != len(expected) {
t.Fatalf("Event count mismatch. Got: %d, Expected: %d", len(got), len(expected))
}

for i, e := range got {
if e.eventType != expected[i] {
t.Errorf("Unexpected event at position %d. Got: %s, Expected: %s", i, e.eventType, expected[i])
}
}
}
Loading