Skip to content

Commit

Permalink
Periodically remove inactive connection pool metrics (#6024)
Browse files Browse the repository at this point in the history
Motivation:

We observed that threads were blocked when multiple connections were
closed simultaneously and the endpoint had a small number of event
loops.

https://github.com/line/armeria/blob/fa76e99fa6132545df3a8d05eeb81c5681ec8953/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java#L79-L85

We have no exact evidence, but I guess Micrometer's `remove()` operation
may take a long time. The other logic is a simple HashMap operation that
does not block for a long time.

Modifications:

- Add a dedicated GC thread to remove inactive meters whose active
connections are 0.
  - A jitter is added to prevent GC from executing simultaneously.
  - Unsed meters are removed every hour + jitter.
- `ConnectionPoolListener` now implements `SafeCloseable` so users
should close it when it is unused.

Result:

- Fix the bug where `EventLoop` is blocked for a long time by
`ConnectionPoolListener.metricCollecting()` when a connection is closed.
  • Loading branch information
ikhoon committed Dec 11, 2024
1 parent 93f9f28 commit 62efa6a
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public final class ClientFactoryBuilder implements TlsSetters {
@Nullable
private ClientTlsConfig tlsConfig;
private boolean staticTlsSettingsSet;
private boolean autoCloseConnectionPoolListener = true;

ClientFactoryBuilder() {
connectTimeoutMillis(Flags.defaultConnectTimeoutMillis());
Expand Down Expand Up @@ -857,11 +858,26 @@ public ClientFactoryBuilder useHttp1Pipelining(boolean useHttp1Pipelining) {

/**
* Sets the listener which is notified on a connection pool event.
* Note that the specified {@link ConnectionPoolListener} will be closed automatically when the
* {@link ClientFactory} is closed.
*/
public ClientFactoryBuilder connectionPoolListener(ConnectionPoolListener connectionPoolListener) {
return connectionPoolListener(connectionPoolListener, true);
}

/**
* Sets the listener which is notified on a connection pool event.
*
* <p>If {@code autoClose} is true, {@link ConnectionPoolListener#close()} will be automatically called when
* the {@link ClientFactory} is closed. Otherwise, you need to close it manually. {@code autoClose} is
* enabled by default.
*
*/
public ClientFactoryBuilder connectionPoolListener(
ConnectionPoolListener connectionPoolListener) {
ConnectionPoolListener connectionPoolListener, boolean autoClose) {
option(ClientFactoryOptions.CONNECTION_POOL_LISTENER,
requireNonNull(connectionPoolListener, "connectionPoolListener"));
autoCloseConnectionPoolListener = autoClose;
return this;
}

Expand Down Expand Up @@ -1075,7 +1091,7 @@ private ClientFactoryOptions buildOptions() {
* Returns a newly-created {@link ClientFactory} based on the properties of this builder.
*/
public ClientFactory build() {
return new DefaultClientFactory(new HttpClientFactory(buildOptions()));
return new DefaultClientFactory(new HttpClientFactory(buildOptions(), autoCloseConnectionPoolListener));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.metric.MeterIdPrefix;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.Ticker;
import com.linecorp.armeria.common.util.Unwrappable;

Expand All @@ -29,7 +30,7 @@
/**
* Listens to the client connection pool events.
*/
public interface ConnectionPoolListener extends Unwrappable {
public interface ConnectionPoolListener extends Unwrappable, SafeCloseable {

/**
* Returns an instance that does nothing.
Expand Down Expand Up @@ -130,4 +131,9 @@ void connectionClosed(SessionProtocol protocol,
default ConnectionPoolListener unwrap() {
return this;
}

@Override
default void close() {
// Do nothing by default.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@
package com.linecorp.armeria.client;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.metric.MeterIdPrefix;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.ThreadFactories;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.micrometer.core.instrument.Counter;
Expand All @@ -32,7 +47,15 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;

final class ConnectionPoolMetrics {
final class ConnectionPoolMetrics implements SafeCloseable {

private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolMetrics.class);

private static final ScheduledExecutorService CLEANUP_EXECUTOR =
Executors.newSingleThreadScheduledExecutor(
ThreadFactories.newThreadFactory("armeria-connection-metric-cleanup-executor",
true));

private static final String PROTOCOL = "protocol";
private static final String REMOTE_IP = "remote.ip";
private static final String LOCAL_IP = "local.ip";
Expand All @@ -43,22 +66,35 @@ final class ConnectionPoolMetrics {
@GuardedBy("lock")
private final Map<List<Tag>, Meters> metersMap = new HashMap<>();
private final ReentrantShortLock lock = new ReentrantShortLock();
private final int cleanupDelaySeconds;
private boolean garbageCollecting;

private volatile boolean closed;
private volatile ScheduledFuture<?> scheduledFuture;

/**
* Creates a new instance with the specified {@link Meter} name.
*/
ConnectionPoolMetrics(MeterRegistry meterRegistry, MeterIdPrefix idPrefix) {
this(meterRegistry, idPrefix, 3600 /* 1 hour */);
}

@VisibleForTesting
ConnectionPoolMetrics(MeterRegistry meterRegistry, MeterIdPrefix idPrefix, int cleanupDelaySeconds) {
this.idPrefix = idPrefix;
this.meterRegistry = meterRegistry;
this.cleanupDelaySeconds = cleanupDelaySeconds;
// Schedule a cleanup task to remove unused meters.
scheduledFuture = CLEANUP_EXECUTOR.schedule(this::cleanupInactiveMeters,
nextCleanupDelaySeconds(), TimeUnit.SECONDS);
}

void increaseConnOpened(SessionProtocol protocol, InetSocketAddress remoteAddr,
InetSocketAddress localAddr) {
final List<Tag> commonTags = commonTags(protocol, remoteAddr, localAddr);
lock.lock();
try {
final Meters meters = metersMap.computeIfAbsent(commonTags,
key -> new Meters(idPrefix, key, meterRegistry));
final Meters meters = metersMap.computeIfAbsent(commonTags, Meters::new);
meters.increment();
} finally {
lock.unlock();
Expand All @@ -82,61 +118,153 @@ void increaseConnClosed(SessionProtocol protocol, InetSocketAddress remoteAddr,
if (meters != null) {
meters.decrement();
assert meters.activeConnections() >= 0 : "active connections should not be negative. " + meters;
if (meters.activeConnections() == 0) {
// XXX(ikhoon): Should we consider to remove the gauge lazily so that collectors can get the
// value.
// Remove gauges to be garbage collected because the cardinality of remoteAddr could be
// high.
metersMap.remove(commonTags);
meters.remove(meterRegistry);
}
}
} finally {
lock.unlock();
}
}

private static final class Meters {
void cleanupInactiveMeters() {
final List<Meters> unusedMetersList = new ArrayList<>();
try {
lock.lock();
// Prevent meter registration while cleaning up.
garbageCollecting = true;

// Collect unused meters.
try {
for (final Iterator<Entry<List<Tag>, Meters>> it = metersMap.entrySet().iterator();
it.hasNext();) {
final Entry<List<Tag>, Meters> entry = it.next();
final Meters meters = entry.getValue();
if (meters.activeConnections() == 0) {
unusedMetersList.add(meters);
it.remove();
}
}

if (unusedMetersList.isEmpty()) {
garbageCollecting = false;
return;
}
} finally {
lock.unlock();
}

// Remove unused meters.
for (Meters meters : unusedMetersList) {
meters.remove(meterRegistry);
}

// Register metrics for the pending meters.
lock.lock();
try {
metersMap.values().forEach(Meters::maybeRegisterMetrics);
garbageCollecting = false;
} finally {
lock.unlock();
}
} catch (Throwable e) {
logger.warn("Failed to cleanup inactive meters.", e);
garbageCollecting = false;
}

if (closed) {
return;
}

// Schedule the next cleanup task.
scheduledFuture = CLEANUP_EXECUTOR.schedule(this::cleanupInactiveMeters,
nextCleanupDelaySeconds(), TimeUnit.SECONDS);
}

private long nextCleanupDelaySeconds() {
// Schedule the cleanup task randomly between cleanupDelayMinutes and 2 * cleanupDelayMinutes.
return cleanupDelaySeconds + ThreadLocalRandom.current().nextInt(cleanupDelaySeconds);
}

@Override
public void close() {
// This method will be invoked after the connection pool is closed.
closed = true;
final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
scheduledFuture.cancel(false);
CLEANUP_EXECUTOR.execute(this::cleanupInactiveMeters);
}

private final Counter opened;
private final Counter closed;
private final Gauge active;
private int activeConnections;
private final class Meters {

private final List<Tag> commonTags;

@Nullable
private Counter opened;
@Nullable
private Counter closed;
@Nullable
private Gauge active;

private int numOpened;
private int numClosed;

Meters(List<Tag> commonTags) {
this.commonTags = commonTags;
if (!garbageCollecting) {
maybeRegisterMetrics();
}
}

void maybeRegisterMetrics() {
if (opened != null) {
return;
}

Meters(MeterIdPrefix idPrefix, List<Tag> commonTags, MeterRegistry registry) {
opened = Counter.builder(idPrefix.name("connections"))
.tags(commonTags)
.tag(STATE, "opened")
.register(registry);
.register(meterRegistry);
if (numOpened > 0) {
opened.increment(numOpened);
}

closed = Counter.builder(idPrefix.name("connections"))
.tags(commonTags)
.tag(STATE, "closed")
.register(registry);
.register(meterRegistry);
if (numClosed > 0) {
closed.increment(numClosed);
}

active = Gauge.builder(idPrefix.name("active.connections"), this, Meters::activeConnections)
.tags(commonTags)
.register(registry);
.register(meterRegistry);
}

Meters increment() {
activeConnections++;
opened.increment();
return this;
void increment() {
numOpened++;
if (opened != null) {
opened.increment();
}
}

Meters decrement() {
activeConnections--;
closed.increment();
return this;
void decrement() {
numClosed++;
if (closed != null) {
closed.increment();
}
}

int activeConnections() {
return activeConnections;
return numOpened - numClosed;
}

void remove(MeterRegistry registry) {
registry.remove(opened);
registry.remove(closed);
registry.remove(active);
if (opened != null) {
assert closed != null;
assert active != null;
registry.remove(opened);
registry.remove(closed);
registry.remove(active);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,10 @@ private static void setupTlsMetrics(List<X509Certificate> certificates, MeterReg
() -> RequestContext.mapCurrent(
ctx -> ctx.eventLoop().withoutContext(), () -> eventLoopGroup().next());
private final ClientFactoryOptions options;
private final boolean autoCloseConnectionPoolListener;
private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);

HttpClientFactory(ClientFactoryOptions options) {
HttpClientFactory(ClientFactoryOptions options, boolean autoCloseConnectionPoolListener) {
workerGroup = options.workerGroup();

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -225,6 +226,7 @@ private static void setupTlsMetrics(List<X509Certificate> certificates, MeterReg
maxConnectionAgeMillis = options.maxConnectionAgeMillis();
maxNumRequestsPerConnection = options.maxNumRequestsPerConnection();
channelPipelineCustomizer = options.channelPipelineCustomizer();
this.autoCloseConnectionPoolListener = autoCloseConnectionPoolListener;

this.options = options;

Expand Down Expand Up @@ -461,6 +463,9 @@ private void closeAsync(CompletableFuture<?> future) {
logger.warn("Failed to close {}s:", HttpChannelPool.class.getSimpleName(), cause);
}

if (autoCloseConnectionPoolListener) {
connectionPoolListener.close();
}
if (shutdownWorkerGroupOnClose) {
workerGroup.shutdownGracefully().addListener((FutureListener<Object>) f -> {
if (f.cause() != null) {
Expand Down
Loading

0 comments on commit 62efa6a

Please sign in to comment.