Skip to content

Commit

Permalink
Use a Closeable for resource subscription for a cleaner interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Dec 14, 2024
1 parent 6d1a294 commit 579f4e0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package io.grpc.xds;

import java.io.Closeable;

public interface XdsClusterSubscriptionRegistry {
ClusterSubscription subscribeToCluster(String clusterName);
void releaseSubscription(ClusterSubscription subscription);
void refreshDynamicSubscriptions();
Closeable subscribeToCluster(String clusterName);

interface ClusterSubscription {
String getClusterName();
}
void refreshDynamicSubscriptions();
}
48 changes: 27 additions & 21 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -38,6 +40,7 @@
* maintains the watchers for the xds resources and when an update is received, it either requests
* referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.
*/
@SuppressWarnings("unused") // TODO remove when changes for A74 are fully implemented
final class XdsDependencyManager implements XdsClusterSubscriptionRegistry {
private final XdsClient xdsClient;
private final XdsConfigWatcher xdsConfigWatcher;
Expand Down Expand Up @@ -67,7 +70,7 @@ final class XdsDependencyManager implements XdsClusterSubscriptionRegistry {
@Override
public ClusterSubscription subscribeToCluster(String clusterName) {
checkNotNull(clusterName, "clusterName");
ClusterSubscription subscription = new ClusterSubscriptionImpl(clusterName);
ClusterSubscription subscription = new ClusterSubscription(clusterName);

Set<ClusterSubscription> localSubscriptions =
clusterSubscriptions.computeIfAbsent(clusterName, k -> new HashSet<>());
Expand All @@ -77,23 +80,6 @@ public ClusterSubscription subscribeToCluster(String clusterName) {
return subscription;
}

@Override
public void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription");
String clusterName = subscription.getClusterName();
Set<ClusterSubscription> subscriptions = clusterSubscriptions.get(clusterName);
if (subscriptions == null) {
logger.log(DEBUG, "Subscription already released for {0}", clusterName);
return;
}

subscriptions.remove(subscription);
if (subscriptions.isEmpty()) {
clusterSubscriptions.remove(clusterName);
// TODO release XdsClient watches for this cluster and its endpoint
}
}

@Override
public void refreshDynamicSubscriptions() {
// TODO: implement
Expand Down Expand Up @@ -130,6 +116,22 @@ private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T>
}
}

private void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription");
String clusterName = subscription.getClusterName();
Set<ClusterSubscription> subscriptions = clusterSubscriptions.get(clusterName);
if (subscriptions == null) {
logger.log(DEBUG, "Subscription already released for {0}", clusterName);
return;
}

subscriptions.remove(subscription);
if (subscriptions.isEmpty()) {
clusterSubscriptions.remove(clusterName);
// TODO release XdsClient watches for this cluster and its endpoint
}
}

@Override
public String toString() {
return logId.toString();
Expand Down Expand Up @@ -173,17 +175,21 @@ public interface XdsConfigWatcher {
void onResourceDoesNotExist(ResourceContext<?> resourceContext);
}

public static final class ClusterSubscriptionImpl implements ClusterSubscription {
private class ClusterSubscription implements Closeable {
String clusterName;

public ClusterSubscriptionImpl(String clusterName) {
public ClusterSubscription(String clusterName) {
this.clusterName = clusterName;
}

@Override
public String getClusterName() {
return clusterName;
}

@Override
public void close() throws IOException {
releaseSubscription(this);
}
}

private abstract class XdsWatcherBase<T extends ResourceUpdate> implements ResourceWatcher<T> {
Expand Down

0 comments on commit 579f4e0

Please sign in to comment.