Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to send empty subscription and update version afterward #11264

Merged
merged 12 commits into from
Jul 30, 2024
10 changes: 8 additions & 2 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,14 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
startRpcStream();
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
if (resources != null) {
adsStream.sendDiscoveryRequest(resourceType, resources);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(resourceType, resources);
if (resources.isEmpty()) {
// The resource type no longer has subscribing resources; clean up references to it
versions.remove(resourceType);
adsStream.respNonces.remove(resourceType);
}
}

Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T>
@SuppressWarnings("unchecked")
public void run() {
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.cancelResourceWatch();
Expand Down
46 changes: 45 additions & 1 deletion xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

/**
* Tests for {@link XdsClientImpl}.
Expand Down Expand Up @@ -2757,6 +2758,37 @@ public void edsResourceNotFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void edsCleanupNonceAfterUnsubscription() {
Assume.assumeFalse(ignoreResourceDeletion());

// Suppose we have an EDS subscription A.1
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
assertThat(call).isNotNull();
call.verifyRequest(EDS, "A.1", "", "", NODE);

// EDS -> {A.1}, version 1
List<Message> dropOverloads = ImmutableList.of();
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)));
call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000");
// {A.1} -> ACK, version 1
call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE);
verify(edsResourceWatcher, times(1)).onChanged(any());

// trigger an EDS resource unsubscription.
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);

// When re-subscribing, the version and nonce were properly forgotten, so the request is the
// same as the initial request
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2));
}

@Test
public void edsResponseErrorHandling_allResourcesFailedUnpack() {
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
Expand Down Expand Up @@ -3787,10 +3819,22 @@ protected abstract static class DiscoveryRpcCall {

protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node) {
Node node, VerificationMode verificationMode) {
throw new UnsupportedOperationException();
}

protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node) {
verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000));
}

protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce,
Node node, VerificationMode verificationMode) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode);
}

protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);
Expand Down
5 changes: 3 additions & 2 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

/**
* Tests for {@link XdsClientImpl} with protocol version v3.
Expand Down Expand Up @@ -205,8 +206,8 @@ private DiscoveryRpcCallV3(StreamObserver<DiscoveryRequest> requestObserver,
@Override
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node) {
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
EnvoyProtoData.Node node, VerificationMode verificationMode) {
verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
}

Expand Down
Loading