Skip to content

Commit

Permalink
Rename activeCpClients to activatedCpClients to avoid overloading the…
Browse files Browse the repository at this point in the history
… term 'active'.

Minor change to stream ready process to avoid sending discovery requests multiple times if the CPC was supporting multiple authorities.
  • Loading branch information
larry-safran committed Nov 8, 2024
1 parent b979d70 commit 5a2e071
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public void uncaughtException(Thread t, Throwable e) {

private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
/** Map of authority to its active control plane client (affected by xds fallback). */
private final Map<String, List<ControlPlaneClient>> activeCpClients = new HashMap<>();
/** Map of authority to its activated control plane client (affected by xds fallback).
* The last entry in the list for each value is the "active" CPC for the matching key */
private final Map<String, List<ControlPlaneClient>> activatedCpClients = new HashMap<>();
private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();

/** Maps resource type to the corresponding map of subscribers (keyed by resource name). */
Expand Down Expand Up @@ -160,7 +161,7 @@ public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
}

private ControlPlaneClient getActiveCpc(String authority) {
List<ControlPlaneClient> controlPlaneClients = activeCpClients.get(authority);
List<ControlPlaneClient> controlPlaneClients = activatedCpClients.get(authority);
if (controlPlaneClients == null || controlPlaneClients.isEmpty()) {
return null;
}
Expand All @@ -182,7 +183,7 @@ public Collection<String> getSubscribedResources(
return null;
}

Set<String> authorities = activeCpClients.entrySet().stream()
Set<String> authorities = activatedCpClients.entrySet().stream()
.filter(entry -> entry.getValue().contains(targetCpc))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -329,7 +330,7 @@ private <T extends ResourceUpdate> CpcWithFallbackState manageControlPlaneClient

private void addCpcToAuthority(String authority, ControlPlaneClient cpcToUse) {
List<ControlPlaneClient> controlPlaneClients =
activeCpClients.computeIfAbsent(authority, k -> new ArrayList<>());
activatedCpClients.computeIfAbsent(authority, k -> new ArrayList<>());

if (controlPlaneClients.contains(cpcToUse)) {
return;
Expand Down Expand Up @@ -626,17 +627,17 @@ private <T extends ResourceUpdate> void handleResourceUpdate(
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update. Note that we can only do this if the resource update is coming from
// the same xDS server that the ResourceSubscriber is subscribed to.
if (activeCpClients.get(subscriber.authority).contains(controlPlaneClient)) {
if (activatedCpClients.get(subscriber.authority).contains(controlPlaneClient)) {
subscriber.onAbsent(processingTracker);
}
}
}

private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) {
// For each authority, remove any control plane clients, with lower priority than the activated
// one, from activeCpClients storing them all in cpcsToShutdown.
// one, from activatedCpClients storing them all in cpcsToShutdown.
Set<ControlPlaneClient> cpcsToShutdown = new HashSet<>();
for ( List<ControlPlaneClient> cpcsForAuth : activeCpClients.values()) {
for ( List<ControlPlaneClient> cpcsForAuth : activatedCpClients.values()) {
if (cpcsForAuth == null) {
continue;
}
Expand All @@ -651,7 +652,7 @@ private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) {
// used by another authority. If they are still being used let the XDS server know that we
// no longer are interested in subscriptions for authorities we are no longer responsible for.
for (ControlPlaneClient cpc : cpcsToShutdown) {
if (activeCpClients.values().stream().noneMatch(list -> list.contains(cpc))) {
if (activatedCpClients.values().stream().noneMatch(list -> list.contains(cpc))) {
cpc.shutdown();
serverCpClientMap.remove(cpc.getServerInfo());
} else {
Expand Down Expand Up @@ -1029,35 +1030,38 @@ public void handleStreamRestarted(ServerInfo serverInfo) {
return;
}

for (String authority : getAuthoritiesForServerInfo(serverInfo)) {
if (hasSubscribers(authority)) {
internalHandleStreamReady(controlPlaneClient, authority);
boolean needToSendDiscoveryRequests = false;
for (Map.Entry<String, List<ControlPlaneClient>> me : activatedCpClients.entrySet()) {
if (me.getValue().contains(controlPlaneClient)) {
needToSendDiscoveryRequests |= internalHandleStreamReady(controlPlaneClient, me.getKey());
}
}
if (needToSendDiscoveryRequests) {
controlPlaneClient.sendDiscoveryRequests();
}
}

private void internalHandleStreamReady(ControlPlaneClient readyCpc, String authority) {
private boolean internalHandleStreamReady(ControlPlaneClient readyCpc, String authority) {
ControlPlaneClient activeCpClient = getActiveCpc(authority);
if (activeCpClient == null) {
activeCpClients.computeIfAbsent(authority, k -> new ArrayList<>()).add(readyCpc);
activatedCpClients.computeIfAbsent(authority, k -> new ArrayList<>()).add(readyCpc);
restartMatchingSubscriberTimers(authority);
readyCpc.sendDiscoveryRequests();
return; // Since nothing was active can ignore fallback
return true; // Since nothing was active can ignore fallback
}

if (activeCpClient.isReady()
&& !activeCpClients.get(authority).contains(readyCpc)) {
&& !activatedCpClients.get(authority).contains(readyCpc)) {
logger.log(XdsLogLevel.INFO, "Ignoring stream restart for lower priority server {0}",
readyCpc.getServerInfo().target());
return;
return false;
}

if (activeCpClient != readyCpc) {
readyCpc.markStartingUp();
}

restartMatchingSubscriberTimers(authority);
readyCpc.sendDiscoveryRequests();
return true;
}
}

Expand All @@ -1072,7 +1076,7 @@ private CpcWithFallbackState(ControlPlaneClient cpc, boolean didFallback) {
}

private List<String> getActiveAuthorities(ControlPlaneClient cpc) {
return activeCpClients.entrySet().stream()
return activatedCpClients.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty()
&& cpc == entry.getValue().get(entry.getValue().size() - 1))
.map(Map.Entry::getKey)
Expand Down

0 comments on commit 5a2e071

Please sign in to comment.