Skip to content

Commit

Permalink
Everything works
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Aug 16, 2024
1 parent f35fd4b commit 6237cd4
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 15 deletions.
8 changes: 4 additions & 4 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType, String authorit
return;
}

// We will do the rest of the methdo as part of the readyHandler when the stream is ready.
// We will do the rest of the method as part of the readyHandler when the stream is ready.
if (!lastStateWasReady) {
return;
}
Expand Down Expand Up @@ -296,6 +296,7 @@ boolean hasSubscribedResources(String authority) {
public final class RpcRetryTask implements Runnable {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Retry timeout. Restart ADS stream {0}", logId);
if (shutdown || isReady()) {
return;
}
Expand Down Expand Up @@ -382,6 +383,7 @@ final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> reso

@Override
public void onReady() {
logger.log(XdsLogLevel.DEBUG, "ADS stream ready {0}", logId);
if (shutdown || closed) {
return;
}
Expand Down Expand Up @@ -453,9 +455,7 @@ private void handleRpcStreamClosed(Status status) {
// concurrently with the stopwatch and schedule.
long delayNanos = scheduleRpcRetry();

if (status.isOk()) {
logger.log(XdsLogLevel.WARNING, "ADS stream closed, backoff {0} ns", delayNanos);
} else {
if (!status.isOk()) {
String errorMsg = status.getDescription() != null
&& status.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
Expand Down
8 changes: 3 additions & 5 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,7 @@ private final class ResourceSubscriber<T extends ResourceUpdate> {
return;
}

if (controlPlaneClient.isConnected()) {
cpcFixed = true;
} else {
if (!controlPlaneClient.isConnected()) {
controlPlaneClient.connect(); // Make sure we are at least trying to connect
}
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -927,7 +925,7 @@ private String getTarget() {
}

private boolean isCpcMutable() {
return !cpcFixed;
return !cpcFixed || type.isSharedName();
}

void onAbsent(@Nullable ProcessingTracker processingTracker) {
Expand All @@ -942,7 +940,7 @@ void onAbsent(@Nullable ProcessingTracker processingTracker) {
&& controlPlaneClient.getServerInfo().ignoreResourceDeletion();
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
logger.log(XdsLogLevel.DEBUG,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
getTarget(), type, resource);
resourceDeletionIgnored = true;
Expand Down
18 changes: 14 additions & 4 deletions xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

Expand Down Expand Up @@ -102,6 +103,8 @@ public void onResourceDoesNotExist(String resourceName) {
private XdsClient.ResourceWatcher<XdsRouteConfigureResource.RdsUpdate> rdsWatcher;
@Mock
private XdsClient.ResourceWatcher<XdsRouteConfigureResource.RdsUpdate> rdsWatcher2;
@Mock
private XdsClient.ResourceWatcher<XdsRouteConfigureResource.RdsUpdate> rdsWatcher3;

private XdsClient.ResourceWatcher<XdsClusterResource.CdsUpdate> raalCdsWatcher =
new XdsClient.ResourceWatcher<XdsClusterResource.CdsUpdate>() {
Expand Down Expand Up @@ -254,6 +257,7 @@ public void mainDown_fallbackUp_restart_main() {
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), FALLBACK_CLUSTER_NAME, cdsWatcher);
inOrder.verify(cdsWatcher, timeout(5000)).onChanged(any());

Mockito.reset(ldsWatcher, rdsWatcher, cdsWatcher, cdsWatcher2);
restartServer(TdServerType.MAIN);

verify(ldsWatcher, timeout(5000)).onChanged(
Expand Down Expand Up @@ -284,31 +288,37 @@ public void connect_then_mainServerDown_fallbackServerUp() {
verify(ldsWatcher, timeout(5000)).onChanged(
XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER));

xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher);
verify(rdsWatcher, timeout(5000)).onChanged(any());

mainTdServer.getServer().shutdownNow();
Mockito.reset(rdsWatcher);

// Shouldn't do fallback since all watchers are loaded
verify(ldsWatcher, timeout(5000).times(0)).onChanged(
XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER));

// Should just get from cache
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher2);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher2);
verify(ldsWatcher2, timeout(5000)).onChanged(
XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER));
verify(ldsWatcher, never()).onChanged(
XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER));
verify(rdsWatcher, never()).onChanged(any());
verify(rdsWatcher2, timeout(5000)).onChanged(any());

// Asking for something not in cache should force a fallback
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), FALLBACK_CLUSTER_NAME, cdsWatcher);
verify(ldsWatcher, timeout(5000)).onChanged(
XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER));
verify(ldsWatcher2, timeout(5000)).onChanged(
XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER));
verify(cdsWatcher, timeout(5000)).onChanged(any());
verify(cdsWatcher, timeout(16000)).onChanged(any());

xdsClient.watchXdsResource(
XdsRouteConfigureResource.getInstance(), FALLBACK_RDS_NAME, rdsWatcher2);
verify(rdsWatcher2, timeout(5000)).onChanged(any());
XdsRouteConfigureResource.getInstance(), FALLBACK_RDS_NAME, rdsWatcher3);
verify(rdsWatcher3, timeout(5000)).onChanged(any());

xdsClient.watchXdsResource(
XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher2);
Expand Down
5 changes: 3 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ public void run() {
return;
}
String resourceType = value.getTypeUrl();
if (!value.getResponseNonce().isEmpty()
&& !String.valueOf(xdsNonces.get(resourceType)).equals(value.getResponseNonce())) {
if (!value.getResponseNonce().isEmpty() && xdsNonces.containsKey(resourceType)
&& (!String.valueOf(xdsNonces.get(resourceType).get(responseObserver))
.equals(value.getResponseNonce()))) {
logger.log(Level.FINE, "Resource nonce does not match, ignore.");
return;
}
Expand Down

0 comments on commit 6237cd4

Please sign in to comment.