Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add counter and gauge metrics #11661

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;

private Args(
Integer defaultPort,
Expand All @@ -297,7 +298,8 @@
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor,
@Nullable String overrideAuthority) {
@Nullable String overrideAuthority,
@Nullable MetricRecorder metricRecorder) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
Expand All @@ -306,6 +308,7 @@
this.channelLogger = channelLogger;
this.executor = executor;
this.overrideAuthority = overrideAuthority;
this.metricRecorder = metricRecorder;
}

/**
Expand Down Expand Up @@ -403,6 +406,14 @@
return overrideAuthority;
}

/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
@Nullable
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}


@Override
public String toString() {
Expand All @@ -415,6 +426,7 @@
.add("channelLogger", channelLogger)
.add("executor", executor)
.add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)

Check warning on line 429 in api/src/main/java/io/grpc/NameResolver.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/NameResolver.java#L429

Added line #L429 was not covered by tests
.toString();
}

Expand All @@ -433,6 +445,7 @@
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
return builder;
}

Expand All @@ -459,6 +472,7 @@
private ChannelLogger channelLogger;
private Executor executor;
private String overrideAuthority;
private MetricRecorder metricRecorder;

Builder() {
}
Expand Down Expand Up @@ -545,6 +559,14 @@
return this;
}

/**
* See {@link Args#getMetricRecorder()}. This is an optional field.
*/
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
}

/**
* Builds an {@link Args}.
*
Expand All @@ -554,7 +576,8 @@
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
scheduledExecutorService, channelLogger, executor, overrideAuthority);
scheduledExecutorService, channelLogger, executor, overrideAuthority,
metricRecorder);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class NameResolverTest {
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private final String overrideAuthority = "grpc.io";
private final MetricRecorder metricRecorder = new MetricRecorder() {};
@Mock NameResolver.Listener mockListener;

@Test
Expand All @@ -77,6 +78,7 @@ public void args() {
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);

NameResolver.Args args2 = args.toBuilder().build();
assertThat(args2.getDefaultPort()).isEqualTo(defaultPort);
Expand All @@ -87,6 +89,7 @@ public void args() {
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);

assertThat(args2).isNotSameInstanceAs(args);
assertThat(args2).isNotEqualTo(args);
Expand All @@ -102,6 +105,7 @@ private NameResolver.Args createArgs() {
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.setOverrideAuthority(overrideAuthority)
.setMetricRecorder(metricRecorder)
.build();
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ ClientStream newSubstream(
builder.maxHedgedAttempts,
loadBalancerFactory);
this.authorityOverride = builder.authorityOverride;
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
this.nameResolverArgs =
NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort())
Expand All @@ -599,6 +601,7 @@ ClientStream newSubstream(
.setChannelLogger(channelLogger)
.setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
.build();
this.nameResolver = getNameResolver(
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
Expand Down Expand Up @@ -671,8 +674,6 @@ public CallTracer create() {
}
serviceConfigUpdated = true;
}
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
}

@VisibleForTesting
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,30 @@ public void metricRecorder_recordsToMetricSink() {
eq(optionalLabelValues));
}

@Test
public void metricRecorder_fromNameResolverArgs_recordsToMetricSink() {
MetricSink mockSink1 = mock(MetricSink.class);
MetricSink mockSink2 = mock(MetricSink.class);
channelBuilder.addMetricSink(mockSink1);
channelBuilder.addMetricSink(mockSink2);
createChannel();

LongCounterMetricInstrument counter = metricInstrumentRegistry.registerLongCounter(
"test_counter", "Time taken by metric recorder", "s",
ImmutableList.of("grpc.method"), Collections.emptyList(), false);
List<String> requiredLabelValues = ImmutableList.of("testMethod");
List<String> optionalLabelValues = Collections.emptyList();

NameResolver.Args args = helper.getNameResolverArgs();
assertThat(args.getMetricRecorder()).isNotNull();
args.getMetricRecorder()
.addLongCounter(counter, 10, requiredLabelValues, optionalLabelValues);
verify(mockSink1).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
verify(mockSink2).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
}

@Test
public void shutdownWithNoTransportsEverCreated() {
channelBuilder.nameResolverFactory(
Expand Down Expand Up @@ -2240,6 +2264,7 @@ public void lbHelper_getNameResolverArgs() {
assertThat(args.getSynchronizationContext())
.isSameInstanceAs(helper.getSynchronizationContext());
assertThat(args.getServiceConfigParser()).isNotNull();
assertThat(args.getMetricRecorder()).isNotNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
Expand All @@ -36,6 +37,11 @@

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
return getOrCreate(target, new MetricRecorder() {});

Check warning on line 40 in xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java#L40

Added line #L40 was not covered by tests
}

public static ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder);

Check warning on line 45 in xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java#L45

Added line #L45 was not covered by tests
}
}
28 changes: 20 additions & 8 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.MetricRecorder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -51,6 +52,8 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private static final boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
System.getenv("GRPC_LOG_XDS_NODE_ID"));
private static final Logger log = Logger.getLogger(XdsClientImpl.class.getName());
private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();

private final Bootstrapper bootstrapper;
private final Object lock = new Object();
Expand Down Expand Up @@ -82,7 +85,8 @@ public ObjectPool<XdsClient> get(String target) {
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
Expand All @@ -98,7 +102,7 @@ public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitialization
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
targetToXdsClientMap.put(target, ref);
}
}
Expand All @@ -111,31 +115,33 @@ public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
@GuardedBy("lock")
private XdsClient xdsClient;
@GuardedBy("lock")
private int refCount;
@GuardedBy("lock")
private XdsClientMetricReporterImpl metricReporter;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
this.metricRecorder = metricRecorder;
}

@Override
Expand All @@ -146,6 +152,7 @@ public XdsClient getObject() {
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
Expand All @@ -154,7 +161,9 @@ public XdsClient getObject() {
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
new TlsContextManagerImpl(bootstrapInfo),
metricReporter);
metricReporter.setXdsClient(xdsClient);
}
refCount++;
return xdsClient;
Expand All @@ -168,6 +177,9 @@ public XdsClient returnObject(Object object) {
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
metricReporter.close();
metricReporter = null;
targetToXdsClientMap.remove(target);
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
return null;
Expand Down
Loading