From 62efa6ac825e2a6d33be607601b71cbfe9ff9f4c Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Wed, 11 Dec 2024 14:21:25 +0900 Subject: [PATCH] Periodically remove inactive connection pool metrics (#6024) Motivation: We observed that threads were blocked when multiple connections were closed simultaneously and the endpoint had a small number of event loops. https://github.com/line/armeria/blob/fa76e99fa6132545df3a8d05eeb81c5681ec8953/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java#L79-L85 We have no exact evidence, but I guess Micrometer's `remove()` operation may take a long time. The other logic is a simple HashMap operation that does not block for a long time. Modifications: - Add a dedicated GC thread to remove inactive meters whose active connections are 0. - A jitter is added to prevent GC from executing simultaneously. - Unsed meters are removed every hour + jitter. - `ConnectionPoolListener` now implements `SafeCloseable` so users should close it when it is unused. Result: - Fix the bug where `EventLoop` is blocked for a long time by `ConnectionPoolListener.metricCollecting()` when a connection is closed. --- .../armeria/client/ClientFactoryBuilder.java | 20 +- .../client/ConnectionPoolListener.java | 8 +- .../armeria/client/ConnectionPoolMetrics.java | 192 +++++++++++++++--- .../armeria/client/HttpClientFactory.java | 7 +- .../ConnectionPoolCollectingMetricTest.java | 53 ++--- .../client/ConnectionPoolMetricsTest.java | 100 +++++++++ .../client/Http1ResponseDecoderTest.java | 3 +- 7 files changed, 320 insertions(+), 63 deletions(-) create mode 100644 core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java index 029ffab983e..b7c4f2627f8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java @@ -131,6 +131,7 @@ public final class ClientFactoryBuilder implements TlsSetters { @Nullable private ClientTlsConfig tlsConfig; private boolean staticTlsSettingsSet; + private boolean autoCloseConnectionPoolListener = true; ClientFactoryBuilder() { connectTimeoutMillis(Flags.defaultConnectTimeoutMillis()); @@ -857,11 +858,26 @@ public ClientFactoryBuilder useHttp1Pipelining(boolean useHttp1Pipelining) { /** * Sets the listener which is notified on a connection pool event. + * Note that the specified {@link ConnectionPoolListener} will be closed automatically when the + * {@link ClientFactory} is closed. + */ + public ClientFactoryBuilder connectionPoolListener(ConnectionPoolListener connectionPoolListener) { + return connectionPoolListener(connectionPoolListener, true); + } + + /** + * Sets the listener which is notified on a connection pool event. + * + *

If {@code autoClose} is true, {@link ConnectionPoolListener#close()} will be automatically called when + * the {@link ClientFactory} is closed. Otherwise, you need to close it manually. {@code autoClose} is + * enabled by default. + * */ public ClientFactoryBuilder connectionPoolListener( - ConnectionPoolListener connectionPoolListener) { + ConnectionPoolListener connectionPoolListener, boolean autoClose) { option(ClientFactoryOptions.CONNECTION_POOL_LISTENER, requireNonNull(connectionPoolListener, "connectionPoolListener")); + autoCloseConnectionPoolListener = autoClose; return this; } @@ -1075,7 +1091,7 @@ private ClientFactoryOptions buildOptions() { * Returns a newly-created {@link ClientFactory} based on the properties of this builder. */ public ClientFactory build() { - return new DefaultClientFactory(new HttpClientFactory(buildOptions())); + return new DefaultClientFactory(new HttpClientFactory(buildOptions(), autoCloseConnectionPoolListener)); } @Override diff --git a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java index 98759228a03..87cf5fb150d 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java +++ b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java @@ -20,6 +20,7 @@ import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.metric.MeterIdPrefix; +import com.linecorp.armeria.common.util.SafeCloseable; import com.linecorp.armeria.common.util.Ticker; import com.linecorp.armeria.common.util.Unwrappable; @@ -29,7 +30,7 @@ /** * Listens to the client connection pool events. */ -public interface ConnectionPoolListener extends Unwrappable { +public interface ConnectionPoolListener extends Unwrappable, SafeCloseable { /** * Returns an instance that does nothing. @@ -130,4 +131,9 @@ void connectionClosed(SessionProtocol protocol, default ConnectionPoolListener unwrap() { return this; } + + @Override + default void close() { + // Do nothing by default. + } } diff --git a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java index 79701db418e..db9e33223bc 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java +++ b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java @@ -16,14 +16,29 @@ package com.linecorp.armeria.client; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.metric.MeterIdPrefix; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.micrometer.core.instrument.Counter; @@ -32,7 +47,15 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; -final class ConnectionPoolMetrics { +final class ConnectionPoolMetrics implements SafeCloseable { + + private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolMetrics.class); + + private static final ScheduledExecutorService CLEANUP_EXECUTOR = + Executors.newSingleThreadScheduledExecutor( + ThreadFactories.newThreadFactory("armeria-connection-metric-cleanup-executor", + true)); + private static final String PROTOCOL = "protocol"; private static final String REMOTE_IP = "remote.ip"; private static final String LOCAL_IP = "local.ip"; @@ -43,13 +66,27 @@ final class ConnectionPoolMetrics { @GuardedBy("lock") private final Map, Meters> metersMap = new HashMap<>(); private final ReentrantShortLock lock = new ReentrantShortLock(); + private final int cleanupDelaySeconds; + private boolean garbageCollecting; + + private volatile boolean closed; + private volatile ScheduledFuture scheduledFuture; /** * Creates a new instance with the specified {@link Meter} name. */ ConnectionPoolMetrics(MeterRegistry meterRegistry, MeterIdPrefix idPrefix) { + this(meterRegistry, idPrefix, 3600 /* 1 hour */); + } + + @VisibleForTesting + ConnectionPoolMetrics(MeterRegistry meterRegistry, MeterIdPrefix idPrefix, int cleanupDelaySeconds) { this.idPrefix = idPrefix; this.meterRegistry = meterRegistry; + this.cleanupDelaySeconds = cleanupDelaySeconds; + // Schedule a cleanup task to remove unused meters. + scheduledFuture = CLEANUP_EXECUTOR.schedule(this::cleanupInactiveMeters, + nextCleanupDelaySeconds(), TimeUnit.SECONDS); } void increaseConnOpened(SessionProtocol protocol, InetSocketAddress remoteAddr, @@ -57,8 +94,7 @@ void increaseConnOpened(SessionProtocol protocol, InetSocketAddress remoteAddr, final List commonTags = commonTags(protocol, remoteAddr, localAddr); lock.lock(); try { - final Meters meters = metersMap.computeIfAbsent(commonTags, - key -> new Meters(idPrefix, key, meterRegistry)); + final Meters meters = metersMap.computeIfAbsent(commonTags, Meters::new); meters.increment(); } finally { lock.unlock(); @@ -82,61 +118,153 @@ void increaseConnClosed(SessionProtocol protocol, InetSocketAddress remoteAddr, if (meters != null) { meters.decrement(); assert meters.activeConnections() >= 0 : "active connections should not be negative. " + meters; - if (meters.activeConnections() == 0) { - // XXX(ikhoon): Should we consider to remove the gauge lazily so that collectors can get the - // value. - // Remove gauges to be garbage collected because the cardinality of remoteAddr could be - // high. - metersMap.remove(commonTags); - meters.remove(meterRegistry); - } } } finally { lock.unlock(); } } - private static final class Meters { + void cleanupInactiveMeters() { + final List unusedMetersList = new ArrayList<>(); + try { + lock.lock(); + // Prevent meter registration while cleaning up. + garbageCollecting = true; + + // Collect unused meters. + try { + for (final Iterator, Meters>> it = metersMap.entrySet().iterator(); + it.hasNext();) { + final Entry, Meters> entry = it.next(); + final Meters meters = entry.getValue(); + if (meters.activeConnections() == 0) { + unusedMetersList.add(meters); + it.remove(); + } + } + + if (unusedMetersList.isEmpty()) { + garbageCollecting = false; + return; + } + } finally { + lock.unlock(); + } + + // Remove unused meters. + for (Meters meters : unusedMetersList) { + meters.remove(meterRegistry); + } + + // Register metrics for the pending meters. + lock.lock(); + try { + metersMap.values().forEach(Meters::maybeRegisterMetrics); + garbageCollecting = false; + } finally { + lock.unlock(); + } + } catch (Throwable e) { + logger.warn("Failed to cleanup inactive meters.", e); + garbageCollecting = false; + } + + if (closed) { + return; + } + + // Schedule the next cleanup task. + scheduledFuture = CLEANUP_EXECUTOR.schedule(this::cleanupInactiveMeters, + nextCleanupDelaySeconds(), TimeUnit.SECONDS); + } + + private long nextCleanupDelaySeconds() { + // Schedule the cleanup task randomly between cleanupDelayMinutes and 2 * cleanupDelayMinutes. + return cleanupDelaySeconds + ThreadLocalRandom.current().nextInt(cleanupDelaySeconds); + } + + @Override + public void close() { + // This method will be invoked after the connection pool is closed. + closed = true; + final ScheduledFuture scheduledFuture = this.scheduledFuture; + scheduledFuture.cancel(false); + CLEANUP_EXECUTOR.execute(this::cleanupInactiveMeters); + } - private final Counter opened; - private final Counter closed; - private final Gauge active; - private int activeConnections; + private final class Meters { + + private final List commonTags; + + @Nullable + private Counter opened; + @Nullable + private Counter closed; + @Nullable + private Gauge active; + + private int numOpened; + private int numClosed; + + Meters(List commonTags) { + this.commonTags = commonTags; + if (!garbageCollecting) { + maybeRegisterMetrics(); + } + } + + void maybeRegisterMetrics() { + if (opened != null) { + return; + } - Meters(MeterIdPrefix idPrefix, List commonTags, MeterRegistry registry) { opened = Counter.builder(idPrefix.name("connections")) .tags(commonTags) .tag(STATE, "opened") - .register(registry); + .register(meterRegistry); + if (numOpened > 0) { + opened.increment(numOpened); + } + closed = Counter.builder(idPrefix.name("connections")) .tags(commonTags) .tag(STATE, "closed") - .register(registry); + .register(meterRegistry); + if (numClosed > 0) { + closed.increment(numClosed); + } + active = Gauge.builder(idPrefix.name("active.connections"), this, Meters::activeConnections) .tags(commonTags) - .register(registry); + .register(meterRegistry); } - Meters increment() { - activeConnections++; - opened.increment(); - return this; + void increment() { + numOpened++; + if (opened != null) { + opened.increment(); + } } - Meters decrement() { - activeConnections--; - closed.increment(); - return this; + void decrement() { + numClosed++; + if (closed != null) { + closed.increment(); + } } int activeConnections() { - return activeConnections; + return numOpened - numClosed; } void remove(MeterRegistry registry) { - registry.remove(opened); - registry.remove(closed); - registry.remove(active); + if (opened != null) { + assert closed != null; + assert active != null; + registry.remove(opened); + registry.remove(closed); + registry.remove(active); + } } } } diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java index d4d7aacb279..35ad761e790 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java @@ -138,9 +138,10 @@ private static void setupTlsMetrics(List certificates, MeterReg () -> RequestContext.mapCurrent( ctx -> ctx.eventLoop().withoutContext(), () -> eventLoopGroup().next()); private final ClientFactoryOptions options; + private final boolean autoCloseConnectionPoolListener; private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync); - HttpClientFactory(ClientFactoryOptions options) { + HttpClientFactory(ClientFactoryOptions options, boolean autoCloseConnectionPoolListener) { workerGroup = options.workerGroup(); @SuppressWarnings("unchecked") @@ -225,6 +226,7 @@ private static void setupTlsMetrics(List certificates, MeterReg maxConnectionAgeMillis = options.maxConnectionAgeMillis(); maxNumRequestsPerConnection = options.maxNumRequestsPerConnection(); channelPipelineCustomizer = options.channelPipelineCustomizer(); + this.autoCloseConnectionPoolListener = autoCloseConnectionPoolListener; this.options = options; @@ -461,6 +463,9 @@ private void closeAsync(CompletableFuture future) { logger.warn("Failed to close {}s:", HttpChannelPool.class.getSimpleName(), cause); } + if (autoCloseConnectionPoolListener) { + connectionPoolListener.close(); + } if (shutdownWorkerGroupOnClose) { workerGroup.shutdownGracefully().addListener((FutureListener) f -> { if (f.cause() != null) { diff --git a/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java index ffd06347c00..d5e0438cde3 100644 --- a/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java @@ -46,16 +46,16 @@ void shouldCollectConnectionPoolEvents() throws Exception { final InetSocketAddress addressA = new InetSocketAddress("10.10.10.10", 3333); final InetSocketAddress addressB = new InetSocketAddress("10.10.10.11", 3333); - final String openABMetricKey = "armeria.client.connections#count{local.ip=10.10.10.11," + - "protocol=H1,remote.ip=10.10.10.10,state=opened}"; - final String closedABMetricKey = "armeria.client.connections#count{local.ip=10.10.10.11," + - "protocol=H1,remote.ip=10.10.10.10,state=closed}"; - final String activeABMetricKey = "armeria.client.active.connections#value{local.ip=10.10.10.11," + - "protocol=H1,remote.ip=10.10.10.10}"; - final String openBAMetricKey = "armeria.client.connections#count{local.ip=10.10.10.10," + - "protocol=H1,remote.ip=10.10.10.11,state=opened}"; - final String activeBAMetricKey = "armeria.client.active.connections#value{local.ip=10.10.10.10," + - "protocol=H1,remote.ip=10.10.10.11}"; + final String openABMetricKey = "armeria.client.connections#count{" + + "local.ip=10.10.10.11,protocol=H1,remote.ip=10.10.10.10,state=opened}"; + final String closedABMetricKey = "armeria.client.connections#count{" + + "local.ip=10.10.10.11,protocol=H1,remote.ip=10.10.10.10,state=closed}"; + final String activeABMetricKey = "armeria.client.active.connections#value{" + + "local.ip=10.10.10.11,protocol=H1,remote.ip=10.10.10.10}"; + final String openBAMetricKey = "armeria.client.connections#count{" + + "local.ip=10.10.10.10,protocol=H1,remote.ip=10.10.10.11,state=opened}"; + final String activeBAMetricKey = "armeria.client.active.connections#value{" + + "local.ip=10.10.10.10,protocol=H1,remote.ip=10.10.10.11}"; final AttributeMap attributeMap = new DefaultAttributeMap(); @@ -64,43 +64,44 @@ void shouldCollectConnectionPoolEvents() throws Exception { assertThat(MoreMeters.measureAll(registry)).containsEntry(activeABMetricKey, 1.0); connectionPoolListener.connectionClosed(SessionProtocol.H1, addressA, addressB, attributeMap); - // If the number of connections is 0, the metric is not collected. + // Although the number of active connections is 0, the metrics will be not removed immediately but + // after an hour. assertThat(MoreMeters.measureAll(registry)) - .doesNotContainKey(openABMetricKey) - .doesNotContainKey(closedABMetricKey) - .doesNotContainKey(activeABMetricKey); + .containsEntry(openABMetricKey, 1.0) + .containsEntry(closedABMetricKey, 1.0) + .containsEntry(activeABMetricKey, 0.0); connectionPoolListener.connectionOpen(SessionProtocol.H1, addressA, addressB, attributeMap); assertThat(MoreMeters.measureAll(registry)) - .containsEntry(openABMetricKey, 1.0) - .containsEntry(closedABMetricKey, 0.0) + .containsEntry(openABMetricKey, 2.0) + .containsEntry(closedABMetricKey, 1.0) .containsEntry(activeABMetricKey, 1.0); connectionPoolListener.connectionOpen(SessionProtocol.H1, addressA, addressB, attributeMap); assertThat(MoreMeters.measureAll(registry)) - .containsEntry(openABMetricKey, 2.0) - .containsEntry(closedABMetricKey, 0.0) + .containsEntry(openABMetricKey, 3.0) + .containsEntry(closedABMetricKey, 1.0) .containsEntry(activeABMetricKey, 2.0); connectionPoolListener.connectionOpen(SessionProtocol.H1, addressB, addressA, attributeMap); assertThat(MoreMeters.measureAll(registry)) - .containsEntry(openABMetricKey, 2.0) - .containsEntry(closedABMetricKey, 0.0) + .containsEntry(openABMetricKey, 3.0) + .containsEntry(closedABMetricKey, 1.0) .containsEntry(activeABMetricKey, 2.0) .containsEntry(openBAMetricKey, 1.0) .containsEntry(activeBAMetricKey, 1.0); connectionPoolListener.connectionClosed(SessionProtocol.H1, addressA, addressB, attributeMap); assertThat(MoreMeters.measureAll(registry)) - .containsEntry(openABMetricKey, 2.0) - .containsEntry(closedABMetricKey, 1.0) + .containsEntry(openABMetricKey, 3.0) + .containsEntry(closedABMetricKey, 2.0) .containsEntry(activeABMetricKey, 1.0) .containsEntry(openBAMetricKey, 1.0) .containsEntry(activeBAMetricKey, 1.0); connectionPoolListener.connectionClosed(SessionProtocol.H1, addressB, addressA, attributeMap); assertThat(MoreMeters.measureAll(registry)) - .containsEntry(openABMetricKey, 2.0) - .containsEntry(closedABMetricKey, 1.0) + .containsEntry(openABMetricKey, 3.0) + .containsEntry(closedABMetricKey, 2.0) .containsEntry(activeABMetricKey, 1.0) - .doesNotContainKey(openBAMetricKey) - .doesNotContainKey(activeBAMetricKey); + .containsEntry(openBAMetricKey, 1.0) + .containsEntry(activeBAMetricKey, 0.0); } } diff --git a/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java new file mode 100644 index 00000000000..fc5423a2ecd --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ + +package com.linecorp.armeria.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.metric.MeterIdPrefix; +import com.linecorp.armeria.common.metric.MoreMeters; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +class ConnectionPoolMetricsTest { + + @Test + void shouldRemoveInactiveMetricsPeriodically() { + final TestMeterRemovalListener removalListener = new TestMeterRemovalListener(); + final SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); + meterRegistry.config().onMeterRemoved(removalListener); + final ConnectionPoolMetrics metrics = new ConnectionPoolMetrics(meterRegistry, + new MeterIdPrefix("test"), + 2); + + final InetSocketAddress remoteAddr1 = new InetSocketAddress("1.1.1.1", 80); + final InetSocketAddress localAddr1 = new InetSocketAddress("1.1.1.2", 80); + final InetSocketAddress remoteAddr2 = new InetSocketAddress("2.2.2.1", 80); + final InetSocketAddress localAddr2 = new InetSocketAddress("2.2.2.2", 80); + final InetSocketAddress remoteAddr3 = new InetSocketAddress("3.3.3.1", 80); + final InetSocketAddress localAddr3 = new InetSocketAddress("3.3.3.2", 80); + metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr1, localAddr1); + metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr1, localAddr1); + metrics.increaseConnClosed(SessionProtocol.HTTP, remoteAddr1, localAddr1); + metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr2, localAddr2); + final Map meters = MoreMeters.measureAll(meterRegistry); + assertThat(meters).containsEntry( + "test.active.connections#value{local.ip=1.1.1.2,protocol=HTTP,remote.ip=1.1.1.1}", 1.0); + assertThat(meters).containsEntry( + "test.active.connections#value{local.ip=2.2.2.2,protocol=HTTP,remote.ip=2.2.2.1}", 1.0); + + metrics.increaseConnClosed(SessionProtocol.HTTP, remoteAddr1, localAddr1); + + // GC is working. + await().untilTrue(removalListener.removing); + // Make sure metrics are collected while GC is working. + metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr3, localAddr3); + // Meters wasn't updated yet. + final Map meters1 = MoreMeters.measureAll(meterRegistry); + assertThat(meters1).doesNotContainKey( + "test.active.connections#value{local.ip=3.3.3.2,protocol=HTTP,remote.ip=3.3.3.1}"); + + // GC is done. + removalListener.waiting.complete(null); + await().untilAsserted(() -> { + final Map meters0 = MoreMeters.measureAll(meterRegistry); + assertThat(meters0).doesNotContainKey( + "test.active.connections#value{local.ip=1.1.1.2,protocol=HTTP,remote.ip=1.1.1.1}"); + assertThat(meters0).containsEntry( + "test.active.connections#value{local.ip=2.2.2.2,protocol=HTTP,remote.ip=2.2.2.1}", 1.0); + assertThat(meters0).containsEntry( + "test.active.connections#value{local.ip=3.3.3.2,protocol=HTTP,remote.ip=3.3.3.1}", 1.0); + }); + } + + private static final class TestMeterRemovalListener implements Consumer { + + final AtomicBoolean removing = new AtomicBoolean(); + final CompletableFuture waiting = new CompletableFuture<>(); + + @Override + public void accept(Meter meter) { + removing.set(true); + waiting.join(); + } + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java b/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java index c600209cc17..318da8e8099 100644 --- a/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java @@ -35,7 +35,8 @@ class Http1ResponseDecoderTest { @Test void testRequestTimeoutClosesImmediately() throws Exception { final EmbeddedChannel channel = new EmbeddedChannel(); - try (HttpClientFactory httpClientFactory = new HttpClientFactory(ClientFactoryOptions.of())) { + try (HttpClientFactory httpClientFactory = new HttpClientFactory(ClientFactoryOptions.of(), + true)) { final Http1ResponseDecoder decoder = new Http1ResponseDecoder( channel, httpClientFactory, SessionProtocol.H1); channel.pipeline().addLast(decoder);