Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clusterresolver: Avoid blocking for subsequent resolver updates in test #7937

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

arjan-bal
Copy link
Contributor

@arjan-bal arjan-bal commented Dec 17, 2024

Fixes: #7961

This test was added recent in #7858. In this test failure, the stack trace indicates that a goroutine was blocked forever. There were no failure logs though. This change avoids writing to the resolver update channel if its buffer is full.

RELEASE NOTES: N/A

Copy link

codecov bot commented Dec 17, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 81.95%. Comparing base (e8055ea) to head (8d114e6).
Report is 10 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7937      +/-   ##
==========================================
- Coverage   82.08%   81.95%   -0.13%     
==========================================
  Files         379      381       +2     
  Lines       38261    38535     +274     
==========================================
+ Hits        31406    31582     +176     
- Misses       5551     5630      +79     
- Partials     1304     1323      +19     

see 29 files with indirect coverage changes

Comment on lines 1235 to 1239
select {
case resolverUpdateCh <- ccs.ResolverState:
default:
// Don't block forever in case of multiple updates.
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I received some comments against performing non-deterministic writes to channels like this in tests because it can lead to flakiness as well.

In the PR description, you mentioned that when the test failed, there was no logs. Were you able to repro this on g3 with logs enabled to figure out the exact cause of the problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to consistently repro the failure in g3. There is a race which causes duplicate resolver updates to be sent to the leaf round robin policy.

Normal flow

  1. clusterresolver starts the eds watch by calling resourceResolver.updateMechanisms:
    func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
  2. After the EDS watch is started, updateMechanisms attempts to send an update to the child balancers by calling generateLocked:
  3. generateLocked sees that EDS is yet to produce the first result, so children are not updated:
    func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
    var ret []priorityConfig
    for _, rDM := range rr.children {
    u, ok := rDM.r.lastUpdate()
    if !ok {
    // Don't send updates to parent until all resolvers have update to
    // send.
    onDone()
    return
    }
  4. EDS produced the first result, it calls reousrceResolver.onUpdate which queues a call to generateLocked:
    func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
    handleUpdate := func(context.Context) {
    rr.mu.Lock()
    rr.generateLocked(onDone)
    rr.mu.Unlock()
    }
    rr.serializer.ScheduleOr(handleUpdate, func() { onDone() })
    }
  5. This time generateLocked updates the child balancer with a new resolver state.

In this flow, only one update is sent to the round_robin balancer.

Exceptional flow

  1. clusterresolver starts the eds watch by calling resourceResolver.updateMechanisms:
    func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
  2. While updateMechanisms is still executing, EDS produces it's first result, it calls reousrceResolver.onUpdate which queues a call to generateLocked:
    func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
    handleUpdate := func(context.Context) {
    rr.mu.Lock()
    rr.generateLocked(onDone)
    rr.mu.Unlock()
    }
    rr.serializer.ScheduleOr(handleUpdate, func() { onDone() })
    }
  3. updateMechanisms attempts to send an update to the child balancers by calling generateLocked:
  4. Since EDS has produced one result, generateLocked updates the child with new resolver state.
  5. The call to generateLocked queued in 2 is executed, updating the child resolver again.

In this flow the leaf round robin gets two updates. Since the channel used to spy on resolver updates has capacity 1, the second write blocks indefinitely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation.

The fix you have here is the simplest one. But it still means that there is some non-determinism in the test. One way to handle this would be as follows:

  • In t.Run() change the order of the steps:
    • Start the management server, and override the OnStreamRequest to get notified when the EDS request specific to the test is being requested.
    • Create the xDS client.
    • Create the manual resolver and the grpc channel (and ask it to connect).
    • Wait for the EDS resource to be request from the management server.
    • Now, configure the resource on the management server.
    • Make the RPC and verify that it succeeds.
    • Verify that the expected update is pushed to the child policy.

I understand this will result in more changes compared to yours, but I feel this gets rid of the non-determinism that comes out of dropping updates from the parent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, while you are here, if you could change calls to net.Listen("tcp", "localhost:0") with calls to testutils.LocalTCPListener(), that would be great too. The implementation for the latter is exactly the same as the former on OSS, but in forge it uses the portpicker to pick a free port before calling net.Listen because we have had flakes in the past on forge when using 0 for the port number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, while you are here, if you could change calls to net.Listen("tcp", "localhost:0") with calls to testutils.LocalTCPListener()?

Done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried following the mentioned approach. It seems to solve the flakiness by delaying the creation of the round robin balancer. However, if the clusterresolver LB policy is created with no localities, it logs an error causing the test to fail.

if b.child == nil {
b.logger.Errorf("xds: received ExitIdle with no child balancer")
break
}

If I provide empty localities in the initial xds resources, the child round robin is created and we end up with a similar problem of getting either 1 or 2 updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the issue with determining the channel size, I switched to using a mutex instead of a channel. The round robin picker may still see 1 or 2 updates, but the test will pass regardless of which update is used for comparison since the updates are duplicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley can you PTAL since @easwars is on leave.

@easwars easwars assigned arjan-bal and unassigned easwars Dec 17, 2024
@arjan-bal arjan-bal assigned easwars and unassigned arjan-bal Dec 18, 2024
@easwars easwars assigned arjan-bal and unassigned easwars Dec 20, 2024
@arjan-bal arjan-bal requested a review from dfawley December 23, 2024 09:19
@arjan-bal arjan-bal assigned dfawley and unassigned arjan-bal Dec 23, 2024
@@ -1232,7 +1233,9 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) {
bd.Data.(balancer.Balancer).Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
resolverUpdateCh <- ccs.ResolverState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be more concise and express the same thing if it used an atomic.Pointer instead of a separate mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use an atomic pointer.

@dfawley dfawley assigned arjan-bal and unassigned dfawley Dec 23, 2024
@arjan-bal arjan-bal assigned dfawley and unassigned arjan-bal Dec 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky test: Test/EDS_EndpointWithMultipleAddresses
3 participants