-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clusterresolver: Avoid blocking for subsequent resolver updates in test #7937
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #7937 +/- ##
==========================================
- Coverage 82.08% 81.95% -0.13%
==========================================
Files 379 381 +2
Lines 38261 38535 +274
==========================================
+ Hits 31406 31582 +176
- Misses 5551 5630 +79
- Partials 1304 1323 +19 |
select { | ||
case resolverUpdateCh <- ccs.ResolverState: | ||
default: | ||
// Don't block forever in case of multiple updates. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I received some comments against performing non-deterministic writes to channels like this in tests because it can lead to flakiness as well.
In the PR description, you mentioned that when the test failed, there was no logs. Were you able to repro this on g3 with logs enabled to figure out the exact cause of the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to consistently repro the failure in g3. There is a race which causes duplicate resolver updates to be sent to the leaf round robin policy.
Normal flow
- clusterresolver starts the eds watch by calling resourceResolver.updateMechanisms:
func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { - After the EDS watch is started, updateMechanisms attempts to send an update to the child balancers by calling generateLocked:
rr.generateLocked(func() {}) - generateLocked sees that EDS is yet to produce the first result, so children are not updated:
grpc-go/xds/internal/balancer/clusterresolver/resource_resolver.go
Lines 285 to 294 in b3bdacb
func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) { var ret []priorityConfig for _, rDM := range rr.children { u, ok := rDM.r.lastUpdate() if !ok { // Don't send updates to parent until all resolvers have update to // send. onDone() return } - EDS produced the first result, it calls reousrceResolver.onUpdate which queues a call to generateLocked:
grpc-go/xds/internal/balancer/clusterresolver/resource_resolver.go
Lines 315 to 322 in b3bdacb
func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) { handleUpdate := func(context.Context) { rr.mu.Lock() rr.generateLocked(onDone) rr.mu.Unlock() } rr.serializer.ScheduleOr(handleUpdate, func() { onDone() }) } - This time generateLocked updates the child balancer with a new resolver state.
In this flow, only one update is sent to the round_robin balancer.
Exceptional flow
- clusterresolver starts the eds watch by calling resourceResolver.updateMechanisms:
func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { - While updateMechanisms is still executing, EDS produces it's first result, it calls reousrceResolver.onUpdate which queues a call to generateLocked:
grpc-go/xds/internal/balancer/clusterresolver/resource_resolver.go
Lines 315 to 322 in b3bdacb
func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) { handleUpdate := func(context.Context) { rr.mu.Lock() rr.generateLocked(onDone) rr.mu.Unlock() } rr.serializer.ScheduleOr(handleUpdate, func() { onDone() }) } - updateMechanisms attempts to send an update to the child balancers by calling generateLocked:
rr.generateLocked(func() {}) - Since EDS has produced one result, generateLocked updates the child with new resolver state.
- The call to generateLocked queued in 2 is executed, updating the child resolver again.
In this flow the leaf round robin gets two updates. Since the channel used to spy on resolver updates has capacity 1, the second write blocks indefinitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation.
The fix you have here is the simplest one. But it still means that there is some non-determinism in the test. One way to handle this would be as follows:
- In
t.Run()
change the order of the steps:- Start the management server, and override the
OnStreamRequest
to get notified when the EDS request specific to the test is being requested. - Create the xDS client.
- Create the manual resolver and the grpc channel (and ask it to connect).
- Wait for the EDS resource to be request from the management server.
- Now, configure the resource on the management server.
- Make the RPC and verify that it succeeds.
- Verify that the expected update is pushed to the child policy.
- Start the management server, and override the
I understand this will result in more changes compared to yours, but I feel this gets rid of the non-determinism that comes out of dropping updates from the parent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, while you are here, if you could change calls to net.Listen("tcp", "localhost:0")
with calls to testutils.LocalTCPListener()
, that would be great too. The implementation for the latter is exactly the same as the former on OSS, but in forge it uses the portpicker to pick a free port before calling net.Listen
because we have had flakes in the past on forge when using 0
for the port number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, while you are here, if you could change calls to net.Listen("tcp", "localhost:0") with calls to testutils.LocalTCPListener()?
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried following the mentioned approach. It seems to solve the flakiness by delaying the creation of the round robin balancer. However, if the clusterresolver LB policy is created with no localities, it logs an error causing the test to fail.
grpc-go/xds/internal/balancer/clusterresolver/clusterresolver.go
Lines 329 to 332 in 063d352
if b.child == nil { | |
b.logger.Errorf("xds: received ExitIdle with no child balancer") | |
break | |
} |
If I provide empty localities in the initial xds resources, the child round robin is created and we end up with a similar problem of getting either 1 or 2 updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the issue with determining the channel size, I switched to using a mutex instead of a channel. The round robin picker may still see 1 or 2 updates, but the test will pass regardless of which update is used for comparison since the updates are duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1232,7 +1233,9 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { | |||
bd.Data.(balancer.Balancer).Close() | |||
}, | |||
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { | |||
resolverUpdateCh <- ccs.ResolverState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be more concise and express the same thing if it used an atomic.Pointer
instead of a separate mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use an atomic pointer.
Fixes: #7961
This test was added recent in #7858. In this test failure, the stack trace indicates that a goroutine was blocked forever. There were no failure logs though. This change avoids writing to the resolver update channel if its buffer is full.
RELEASE NOTES: N/A