Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to send empty subscription and update version afterward #11264

Merged
merged 12 commits into from
Jul 30, 2024
5 changes: 2 additions & 3 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
startRpcStream();
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
if (resources != null) {
adsStream.sendDiscoveryRequest(resourceType, resources);
}
adsStream.sendDiscoveryRequest(resourceType,
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
resources == null ? Collections.emptySet() : resources);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T>
@SuppressWarnings("unchecked")
public void run() {
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.cancelResourceWatch();
Expand All @@ -291,7 +291,6 @@ public void run() {
}
if (resourceSubscribers.get(type).isEmpty()) {
resourceSubscribers.remove(type);
subscribedResourceTypeUrls.remove(type.typeUrl());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want to clean up the resource. Nonces are per-resource? Then it seems when the client starts watches again the server should notice the lack of nonce. The issue might be instead that we aren't cleaning up AdsStream.respNonces?

Note that maybe we should do the new I/O you are causing in this PR, but maybe we allow sending the ACK even when subscribedResourceTypeUrls lacks the type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removing line 294, have it call a cleanup method on the subscriber.controlPlaneClient (if it isn't null) to remove the nonce. You'll have to create the cleanup method that you'll be calling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me. @lujiajing1126, if it turns out to be annoying to make that change, tell us and we'll see how we can help. Also, if you think that wouldn't fully address what you noticed, say so. I don't fully understand "adjustResourceSubscription issue;" it just looks like the same nonce issue to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, if you are uncertain about the changes, you can send them out before you update/fix any tests. A sort of early review.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want to clean up the resource. Nonces are per-resource? Then it seems when the client starts watches again the server should notice the lack of nonce. The issue might be instead that we aren't cleaning up AdsStream.respNonces?

Instead of removing line 294, have it call a cleanup method on the subscriber.controlPlaneClient (if it isn't null) to remove the nonce. You'll have to create the cleanup method that you'll be calling.

I agree with both of you. Instead of creating a cleanup method, I've merged cleanup logic into the existing adjustResourceSubscription method. PTAL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me. @lujiajing1126, if it turns out to be annoying to make that change, tell us and we'll see how we can help. Also, if you think that wouldn't fully address what you noticed, say so. I don't fully understand "adjustResourceSubscription issue;" it just looks like the same nonce issue to me.

Yes. Exactly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, if you are uncertain about the changes, you can send them out before you update/fix any tests. A sort of early review.)

I tried to fix this issue based on the comment (without modifying the test cases)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please modify the failing test case to expect the nonce to be reset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please modify the failing test case to expect the nonce to be reset.

Test case has been fixed with some additional helper to access the underlying private/package-private fields

}
}
}
Expand Down
41 changes: 40 additions & 1 deletion xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ private void verifyResourceCount(
XdsResourceType<?> type,
int size) {
if (size == 0) {
assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isFalse();
assertThat(subscribedResourcesMetadata.containsKey(type)).isFalse();
} else {
assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isTrue();
Expand Down Expand Up @@ -2757,6 +2756,46 @@ public void edsResourceNotFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void edsAllowRespondAfterUnsubscription() {
Assume.assumeFalse(ignoreResourceDeletion());

// Suppose we have an EDS subscription A.1
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
assertThat(call).isNotNull();
verifyResourceMetadataRequested(EDS, "A.1");
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);

// EDS -> {A.1}, version 1
List<Message> dropOverloads = ImmutableList.of();
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)));
call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000");
// {A.1} -> ACK, version 1
verifyResourceMetadataAcked(EDS, "A.1", resourcesV1.get("A.1"), VERSION_1, TIME_INCREMENT);
verify(edsResourceWatcher, times(1)).onChanged(any());

// trigger an EDS resource unsubscription.
// This would probably be caused by CDS PUSH(let's say event e1) in the real world.
// Then there can be a potential data race between
// 1) the EDS unsubscription caused by CDS PUSH e1 (client-side) and,
// 2) the immediate EDS PUSH from XdsServer (server-side) after CDS PUSH e1 (event e2).
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
// FIX(1): allow to send empty subscription
call.verifyRequest(EDS, Collections.emptyList(), VERSION_1, "0000", NODE);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
// An EDS PUSH after CDS PUSH e1.
List<Message> endpointsV2 = ImmutableList.of(lbEndpointHealthy);
ImmutableMap<String, Any> resourcesV2 = ImmutableMap.of(
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV2, dropOverloads)));
call.sendResponse(EDS, resourcesV2.values().asList(), VERSION_2, "0001");
// FIX(2): allow to update resource version even if the subscription resource is empty
call.verifyRequest(EDS, Collections.emptyList(), VERSION_2, "0001", NODE);
verifyNoMoreInteractions(edsResourceWatcher);
}

@Test
public void edsResponseErrorHandling_allResourcesFailedUnpack() {
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
Expand Down
Loading