Skip to content

Commit

Permalink
Everything except fallback working again
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Aug 15, 2024
1 parent 0b1f3e0 commit f35fd4b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
16 changes: 16 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType, String authorit
return;
}

// We will do the rest of the methdo as part of the readyHandler when the stream is ready.
if (!lastStateWasReady) {
return;
}

Collection<String> resources =
resourceStore.getSubscribedResources(serverInfo, resourceType, authority);
if (resources == null) {
Expand Down Expand Up @@ -276,6 +281,17 @@ void sendDiscoveryRequests(String authority) {
}
}

boolean hasSubscribedResources(String authority) {
for (XdsResourceType type : resourceStore.getSubscribedResourceTypesWithTypeUrl().values()) {
Collection subscribedResources =
resourceStore.getSubscribedResources(serverInfo, type, authority);
if (subscribedResources != null && !subscribedResources.isEmpty()) {
return true;
}
}
return false;
}

@VisibleForTesting
public final class RpcRetryTask implements Runnable {
@Override
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ Collection<String> getSubscribedResources(
*/
default void assignResourcesToOwner(XdsResourceType<?> type, Collection<String> resources,
Object owner) {
// no-op - useful for test cases where everything is mocked
// Default is a no-op implemention which is useful for test cases where everything is mocked
}
}
}
41 changes: 25 additions & 16 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.grpc.xds.client.XdsClient.ResourceStore;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -285,6 +286,10 @@ public void run() {
subscriber = new ResourceSubscriber<>(type, resourceName);
resourceSubscribers.get(type).put(resourceName, subscriber);
subscriber.addWatcher(watcher, watcherExecutor);
if (subscriber.errorDescription != null) {
return;
}

ControlPlaneClient cpcToUse;
if (subscriber.controlPlaneClient == null) {
cpcToUse = activeCpClients.get(subscriber.authority);
Expand Down Expand Up @@ -464,15 +469,9 @@ public ControlPlaneClient getOrCreateControlPlaneClient(ImmutableList<ServerInfo
}
return controlPlaneClient;
} else {
try {
ControlPlaneClient cpc = getOrCreateControlPlaneClient(serverInfo);
logger.log(XdsLogLevel.DEBUG, "Created control plane client {0}", cpc);
return cpc;
} catch (Exception e) {
logger.log(XdsLogLevel.WARNING, "Failed to create control plane client for {0}: {1}",
serverInfo.target(), e);
return null;
}
ControlPlaneClient cpc = getOrCreateControlPlaneClient(serverInfo);
logger.log(XdsLogLevel.DEBUG, "Created control plane client {0}", cpc);
return cpc;
}
}

Expand Down Expand Up @@ -547,6 +546,21 @@ private ImmutableList<ServerInfo> getServerInfos(String authority) {
}
}

private List<String> getAuthoritiesForServerInfo(ServerInfo serverInfo) {
List<String> authorities = new ArrayList<>();
for (Map.Entry<String, AuthorityInfo> entry : bootstrapInfo.authorities().entrySet()) {
if (entry.getValue().xdsServers().contains(serverInfo)) {
authorities.add(entry.getKey());
}
}

if (bootstrapInfo.servers().contains(serverInfo)) {
authorities.add(null);
}

return authorities;
}

@SuppressWarnings("unchecked")
private <T extends ResourceUpdate> void handleResourceUpdate(
XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
Expand Down Expand Up @@ -1080,8 +1094,8 @@ public void handleStreamReady(ServerInfo serverInfo) {
return;
}

for (String authority : activeCpClients.keySet()) {
if (servesAuthority(serverInfo, authority)) {
for (String authority : getAuthoritiesForServerInfo(serverInfo)) {
if (controlPlaneClient.hasSubscribedResources(authority)) {
internalHandleStreamReady(serverInfo, controlPlaneClient, authority);
}
}
Expand Down Expand Up @@ -1146,11 +1160,6 @@ private void restartMatchingSubscriberTimers(
}
}

private boolean servesAuthority(ServerInfo serverInfo, String authority) {
ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
return serverInfos != null && serverInfos.contains(serverInfo);
}

private int compareCpClients(ControlPlaneClient base, ControlPlaneClient other,
String authority) {
if (base == other || other == null) {
Expand Down

0 comments on commit f35fd4b

Please sign in to comment.