From 7009071b6d53bbc3d740ea99cdc0c010692679ab Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Sat, 13 Apr 2024 10:00:23 -0700 Subject: [PATCH] [fix][broker] Optimize /metrics, fix unbounded request queue issue and fix race conditions in metricsBufferResponse mode (#22494) --- conf/proxy.conf | 6 +- .../PrometheusMetricsGeneratorUtils.java | 2 +- .../prometheus/PrometheusMetricsServlet.java | 149 +++++--- .../pulsar/broker/stats/TimeWindow.java | 94 ----- .../pulsar/broker/stats/WindowWrap.java | 56 --- .../stats/prometheus/MetricsExports.java | 68 ++++ .../prometheus/PrometheusMetricStreams.java | 2 +- .../PrometheusMetricsGenerator.java | 328 +++++++++++------- .../PulsarPrometheusMetricsServlet.java | 140 +++++++- .../broker/stats/prometheus/TopicStats.java | 12 +- .../pulsar/PrometheusMetricsTestUtil.java | 84 +++++ .../persistent/BucketDelayedDeliveryTest.java | 6 +- .../persistent/PersistentTopicTest.java | 4 +- .../service/schema/SchemaServiceTest.java | 4 +- .../broker/stats/ConsumerStatsTest.java | 4 +- .../broker/stats/MetadataStoreStatsTest.java | 6 +- .../broker/stats/PrometheusMetricsTest.java | 120 ++++--- .../broker/stats/SubscriptionStatsTest.java | 4 +- .../pulsar/broker/stats/TimeWindowTest.java | 83 ----- .../broker/stats/TransactionMetricsTest.java | 18 +- .../buffer/TransactionBufferClientTest.java | 4 +- .../pendingack/PendingAckPersistentTest.java | 4 +- .../pulsar/broker/web/WebServiceTest.java | 4 +- .../common/util/SimpleTextOutputStream.java | 16 +- .../proxy/server/ProxyConfiguration.java | 6 + .../pulsar/proxy/server/ProxyService.java | 3 +- .../proxy/server/ProxyServiceStarter.java | 40 ++- 27 files changed, 739 insertions(+), 528 deletions(-) delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java diff --git a/conf/proxy.conf b/conf/proxy.conf index 8285e1cb75320..5a9d433f39ceb 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -376,5 +376,7 @@ zooKeeperCacheExpirySeconds=-1 enableProxyStatsEndpoints=true # Whether the '/metrics' endpoint requires authentication. Defaults to true authenticateMetricsEndpoint=true -# Enable cache metrics data, default value is false -metricsBufferResponse=false +# Time in milliseconds that metrics endpoint would time out. Default is 30s. +# Set it to 0 to disable timeout. +metricsServletTimeoutMs=30000 + diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java index 828d9871bb3de..077d5280b5102 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java @@ -76,7 +76,7 @@ public static void generateSystemMetrics(SimpleTextOutputStream stream, String c } for (int j = 0; j < sample.labelNames.size(); j++) { String labelValue = sample.labelValues.get(j); - if (labelValue != null) { + if (labelValue != null && labelValue.indexOf('"') > -1) { labelValue = labelValue.replace("\"", "\\\""); } if (j > 0) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 64d1fcdab6f14..8a41bed29d44f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -25,9 +25,13 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; import javax.servlet.ServletException; -import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -35,67 +39,132 @@ import org.slf4j.LoggerFactory; public class PrometheusMetricsServlet extends HttpServlet { - private static final long serialVersionUID = 1L; - private static final int HTTP_STATUS_OK_200 = 200; - private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500; - - private final long metricsServletTimeoutMs; - private final String cluster; + static final int HTTP_STATUS_OK_200 = 200; + static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500; + protected final long metricsServletTimeoutMs; + protected final String cluster; protected List metricsProviders; - private ExecutorService executor = null; + protected ExecutorService executor = null; + protected final int executorMaxThreads; public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster) { + this(metricsServletTimeoutMs, cluster, 1); + } + + public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster, int executorMaxThreads) { this.metricsServletTimeoutMs = metricsServletTimeoutMs; this.cluster = cluster; + this.executorMaxThreads = executorMaxThreads; } @Override public void init() throws ServletException { - executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("prometheus-stats")); + if (executorMaxThreads > 0) { + executor = + Executors.newScheduledThreadPool(executorMaxThreads, new DefaultThreadFactory("prometheus-stats")); + } } @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) { AsyncContext context = request.startAsync(); - context.setTimeout(metricsServletTimeoutMs); - executor.execute(() -> { - long start = System.currentTimeMillis(); - HttpServletResponse res = (HttpServletResponse) context.getResponse(); - try { - res.setStatus(HTTP_STATUS_OK_200); - res.setContentType("text/plain;charset=utf-8"); - generateMetrics(cluster, res.getOutputStream()); - } catch (Exception e) { - long end = System.currentTimeMillis(); - long time = end - start; - if (e instanceof EOFException) { - // NO STACKTRACE - log.error("Failed to send metrics, " - + "likely the client or this server closed " - + "the connection due to a timeout ({} ms elapsed): {}", time, e + ""); - } else { - log.error("Failed to generate prometheus stats, {} ms elapsed", time, e); + // set hard timeout to 2 * timeout + if (metricsServletTimeoutMs > 0) { + context.setTimeout(metricsServletTimeoutMs * 2); + } + long startNanos = System.nanoTime(); + AtomicBoolean taskStarted = new AtomicBoolean(false); + Future future = executor.submit(() -> { + taskStarted.set(true); + long elapsedNanos = System.nanoTime() - startNanos; + // check if the request has been timed out, implement a soft timeout + // so that response writing can continue to up to 2 * timeout + if (metricsServletTimeoutMs > 0 && elapsedNanos > TimeUnit.MILLISECONDS.toNanos(metricsServletTimeoutMs)) { + log.warn("Prometheus metrics request was too long in queue ({}ms). Skipping sending metrics.", + TimeUnit.NANOSECONDS.toMillis(elapsedNanos)); + if (!response.isCommitted()) { + response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); } - res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); - } finally { - long end = System.currentTimeMillis(); - long time = end - start; - try { - context.complete(); - } catch (IllegalStateException e) { - // this happens when metricsServletTimeoutMs expires - // java.lang.IllegalStateException: AsyncContext completed and/or Request lifecycle recycled - log.error("Failed to generate prometheus stats, " - + "this is likely due to metricsServletTimeoutMs: {} ms elapsed: {}", time, e + ""); + context.complete(); + return; + } + handleAsyncMetricsRequest(context); + }); + context.addListener(new AsyncListener() { + @Override + public void onComplete(AsyncEvent asyncEvent) throws IOException { + if (!taskStarted.get()) { + future.cancel(false); } } + + @Override + public void onTimeout(AsyncEvent asyncEvent) throws IOException { + if (!taskStarted.get()) { + future.cancel(false); + } + log.warn("Prometheus metrics request timed out"); + HttpServletResponse res = (HttpServletResponse) context.getResponse(); + if (!res.isCommitted()) { + res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + } + context.complete(); + } + + @Override + public void onError(AsyncEvent asyncEvent) throws IOException { + if (!taskStarted.get()) { + future.cancel(false); + } + } + + @Override + public void onStartAsync(AsyncEvent asyncEvent) throws IOException { + + } }); + + } + + private void handleAsyncMetricsRequest(AsyncContext context) { + long start = System.currentTimeMillis(); + HttpServletResponse res = (HttpServletResponse) context.getResponse(); + try { + generateMetricsSynchronously(res); + } catch (Exception e) { + long end = System.currentTimeMillis(); + long time = end - start; + if (e instanceof EOFException) { + // NO STACKTRACE + log.error("Failed to send metrics, " + + "likely the client or this server closed " + + "the connection due to a timeout ({} ms elapsed): {}", time, e + ""); + } else { + log.error("Failed to generate prometheus stats, {} ms elapsed", time, e); + } + if (!res.isCommitted()) { + res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + } + } finally { + long end = System.currentTimeMillis(); + long time = end - start; + try { + context.complete(); + } catch (IllegalStateException e) { + // this happens when metricsServletTimeoutMs expires + // java.lang.IllegalStateException: AsyncContext completed and/or Request lifecycle recycled + log.error("Failed to generate prometheus stats, " + + "this is likely due to metricsServletTimeoutMs: {} ms elapsed: {}", time, e + ""); + } + } } - protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException { - PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders); + private void generateMetricsSynchronously(HttpServletResponse res) throws IOException { + res.setStatus(HTTP_STATUS_OK_200); + res.setContentType("text/plain;charset=utf-8"); + PrometheusMetricsGeneratorUtils.generate(cluster, res.getOutputStream(), metricsProviders); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java deleted file mode 100644 index 08730189322ee..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.stats; - -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.function.Function; - -public final class TimeWindow { - private final int interval; - private final int sampleCount; - private final AtomicReferenceArray> array; - - public TimeWindow(int sampleCount, int interval) { - this.sampleCount = sampleCount; - this.interval = interval; - this.array = new AtomicReferenceArray<>(sampleCount); - } - - /** - * return current time window data. - * - * @param function generate data. - * @return - */ - public synchronized WindowWrap current(Function function) { - long millis = System.currentTimeMillis(); - - if (millis < 0) { - return null; - } - int idx = calculateTimeIdx(millis); - long windowStart = calculateWindowStart(millis); - while (true) { - WindowWrap old = array.get(idx); - if (old == null) { - WindowWrap window = new WindowWrap<>(interval, windowStart, null); - if (array.compareAndSet(idx, null, window)) { - T value = null == function ? null : function.apply(null); - window.value(value); - return window; - } else { - Thread.yield(); - } - } else if (windowStart == old.start()) { - return old; - } else if (windowStart > old.start()) { - T value = null == function ? null : function.apply(old.value()); - old.value(value); - old.resetWindowStart(windowStart); - return old; - } else { - //it should never goes here - throw new IllegalStateException(); - } - } - } - - private int calculateTimeIdx(long timeMillis) { - long timeId = timeMillis / this.interval; - return (int) (timeId % sampleCount); - } - - private long calculateWindowStart(long timeMillis) { - return timeMillis - timeMillis % this.interval; - } - - public int sampleCount() { - return sampleCount; - } - - public int interval() { - return interval; - } - - public long currentWindowStart(long millis) { - return this.calculateWindowStart(millis); - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java deleted file mode 100644 index 12869b82921e5..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.stats; - -public final class WindowWrap { - private final long interval; - private long start; - private T value; - - public WindowWrap(long interval, long windowStart, T value) { - this.interval = interval; - this.start = windowStart; - this.value = value; - } - - public long interval() { - return this.interval; - } - - public long start() { - return this.start; - } - - public T value() { - return value; - } - - public void value(T value) { - this.value = value; - } - - public WindowWrap resetWindowStart(long startTime) { - this.start = startTime; - return this; - } - - public boolean isTimeInWindow(long timeMillis) { - return start <= timeMillis && timeMillis < start + interval; - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java new file mode 100644 index 0000000000000..b80e5747d8a5a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; +import io.prometheus.client.hotspot.DefaultExports; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.common.util.DirectMemoryUtils; + +public class MetricsExports { + private static boolean initialized = false; + + private MetricsExports() { + } + + public static synchronized void initialize() { + if (!initialized) { + DefaultExports.initialize(); + register(CollectorRegistry.defaultRegistry); + initialized = true; + } + } + + public static void register(CollectorRegistry registry) { + Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Gauge.Child() { + @Override + public double get() { + return getJvmDirectMemoryUsed(); + } + }).register(registry); + + Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Gauge.Child() { + @Override + public double get() { + return DirectMemoryUtils.jvmMaxDirectMemory(); + } + }).register(registry); + + // metric to export pulsar version info + Gauge.build("pulsar_version_info", "-") + .labelNames("version", "commit").create() + .setChild(new Gauge.Child() { + @Override + public double get() { + return 1.0; + } + }, PulsarVersion.getVersion(), PulsarVersion.getGitSha()) + .register(registry); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java index 93cbad4e19503..5a5a61404b87f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java @@ -42,7 +42,7 @@ void writeSample(String metricName, Number value, String... labelsAndValuesArray stream.write(metricName).write('{'); for (int i = 0; i < labelsAndValuesArray.length; i += 2) { String labelValue = labelsAndValuesArray[i + 1]; - if (labelValue != null) { + if (labelValue != null && labelValue.indexOf('"') > -1) { labelValue = labelValue.replace("\"", "\\\""); } stream.write(labelsAndValuesArray[i]).write("=\"").write(labelValue).write('\"'); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 124f0d3e54e4f..bbd09335c0a97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -20,40 +20,39 @@ import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr; -import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import io.netty.buffer.ByteBuf; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.prometheus.client.Collector; -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Gauge; -import io.prometheus.client.Gauge.Child; -import io.prometheus.client.hotspot.DefaultExports; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.StringWriter; +import java.io.OutputStreamWriter; import java.io.Writer; -import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.TimeWindow; -import org.apache.pulsar.broker.stats.WindowWrap; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.DirectMemoryUtils; import org.apache.pulsar.common.util.SimpleTextOutputStream; -import org.eclipse.jetty.server.HttpOutput; /** * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out @@ -62,123 +61,80 @@ * href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats */ @Slf4j -public class PrometheusMetricsGenerator { - private static volatile TimeWindow timeWindow; - private static final int MAX_COMPONENTS = 64; - - static { - DefaultExports.initialize(); - - Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() { - @Override - public double get() { - return getJvmDirectMemoryUsed(); - } - }).register(CollectorRegistry.defaultRegistry); - - Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() { - @Override - public double get() { - return DirectMemoryUtils.jvmMaxDirectMemory(); - } - }).register(CollectorRegistry.defaultRegistry); - - // metric to export pulsar version info - Gauge.build("pulsar_version_info", "-") - .labelNames("version", "commit").create() - .setChild(new Child() { - @Override - public double get() { - return 1.0; - } - }, PulsarVersion.getVersion(), PulsarVersion.getGitSha()) - .register(CollectorRegistry.defaultRegistry); - } - - public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, OutputStream out) throws IOException { - generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null); - } - - public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, - OutputStream out) throws IOException { - generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, - splitTopicAndPartitionIndexLabel, out, null); - } - - public static synchronized void generate(PulsarService pulsar, boolean includeTopicMetrics, - boolean includeConsumerMetrics, boolean includeProducerMetrics, - boolean splitTopicAndPartitionIndexLabel, OutputStream out, - List metricsProviders) throws IOException { - ByteBuf buffer; - boolean exposeBufferMetrics = pulsar.getConfiguration().isMetricsBufferResponse(); +public class PrometheusMetricsGenerator implements AutoCloseable { + private static final int DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024; // 1MB + private static final int MINIMUM_FOR_MAX_COMPONENTS = 64; + + private volatile MetricsBuffer metricsBuffer; + private static AtomicReferenceFieldUpdater metricsBufferFieldUpdater = + AtomicReferenceFieldUpdater.newUpdater(PrometheusMetricsGenerator.class, MetricsBuffer.class, + "metricsBuffer"); + private volatile boolean closed; + + public static class MetricsBuffer { + private final CompletableFuture bufferFuture; + private final long createTimeslot; + private final AtomicInteger refCnt = new AtomicInteger(2); + + MetricsBuffer(long timeslot) { + bufferFuture = new CompletableFuture<>(); + createTimeslot = timeslot; + } - if (!exposeBufferMetrics) { - buffer = generate0(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, - splitTopicAndPartitionIndexLabel, metricsProviders); - } else { - if (null == timeWindow) { - int period = pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds(); - timeWindow = new TimeWindow<>(1, (int) TimeUnit.SECONDS.toMillis(period)); - } - WindowWrap window = timeWindow.current(oldBuf -> { - // release expired buffer, in case of memory leak - if (oldBuf != null && oldBuf.refCnt() > 0) { - oldBuf.release(); - log.debug("Cached metrics buffer released"); - } + public CompletableFuture getBufferFuture() { + return bufferFuture; + } - try { - ByteBuf buf = generate0(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, - splitTopicAndPartitionIndexLabel, metricsProviders); - log.debug("Generated metrics buffer size {}", buf.readableBytes()); - return buf; - } catch (IOException e) { - log.error("Generate metrics failed", e); - //return empty buffer if exception happens - return PulsarByteBufAllocator.DEFAULT.heapBuffer(0); - } - }); + long getCreateTimeslot() { + return createTimeslot; + } - if (null == window || null == window.value()) { - return; - } - buffer = window.value(); - log.debug("Current window start {}, current cached buf size {}", window.start(), buffer.readableBytes()); + /** + * Retain the buffer. This is allowed, only when the buffer is not already released. + * + * @return true if the buffer is retained successfully, false otherwise. + */ + boolean retain() { + return refCnt.updateAndGet(x -> x > 0 ? x + 1 : x) > 0; } - try { - if (out instanceof HttpOutput) { - HttpOutput output = (HttpOutput) out; - //no mem_copy and memory allocations here - ByteBuffer[] buffers = buffer.nioBuffers(); - for (ByteBuffer buffer0 : buffers) { - output.write(buffer0); - } - } else { - //read data from buffer and write it to output stream, with no more heap buffer(byte[]) allocation. - //not modify buffer readIndex/writeIndex here. - int readIndex = buffer.readerIndex(); - int readableBytes = buffer.readableBytes(); - for (int i = 0; i < readableBytes; i++) { - out.write(buffer.getByte(readIndex + i)); - } - } - } finally { - if (!exposeBufferMetrics && buffer.refCnt() > 0) { - buffer.release(); - log.debug("Metrics buffer released."); + /** + * Release the buffer. + */ + public void release() { + int newValue = refCnt.decrementAndGet(); + if (newValue == 0) { + bufferFuture.whenComplete((byteBuf, throwable) -> { + if (byteBuf != null) { + byteBuf.release(); + } + }); } } } - private static ByteBuf generate0(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, - List metricsProviders) throws IOException { - //Use unpooled buffers here to avoid direct buffer usage increasing. - //when write out 200MB data, MAX_COMPONENTS = 64 needn't mem_copy. see: CompositeByteBuf#consolidateIfNeeded() - ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.compositeDirectBuffer(MAX_COMPONENTS); + private final PulsarService pulsar; + private final boolean includeTopicMetrics; + private final boolean includeConsumerMetrics; + private final boolean includeProducerMetrics; + private final boolean splitTopicAndPartitionIndexLabel; + private final Clock clock; + + private volatile int initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE; + + public PrometheusMetricsGenerator(PulsarService pulsar, boolean includeTopicMetrics, + boolean includeConsumerMetrics, boolean includeProducerMetrics, + boolean splitTopicAndPartitionIndexLabel, Clock clock) { + this.pulsar = pulsar; + this.includeTopicMetrics = includeTopicMetrics; + this.includeConsumerMetrics = includeConsumerMetrics; + this.includeProducerMetrics = includeProducerMetrics; + this.splitTopicAndPartitionIndexLabel = splitTopicAndPartitionIndexLabel; + this.clock = clock; + } + + private ByteBuf generate0(List metricsProviders) { + ByteBuf buf = allocateMultipartCompositeDirectBuffer(); boolean exceptionHappens = false; //Used in namespace/topic and transaction aggregators as share metric names PrometheusMetricStreams metricStreams = new PrometheusMetricStreams(); @@ -220,10 +176,34 @@ private static ByteBuf generate0(PulsarService pulsar, boolean includeTopicMetri //if exception happens, release buffer if (exceptionHappens) { buf.release(); + } else { + // for the next time, the initial buffer size will be suggested by the last buffer size + initialBufferSize = Math.max(DEFAULT_INITIAL_BUFFER_SIZE, buf.readableBytes()); } } } + private ByteBuf allocateMultipartCompositeDirectBuffer() { + // use composite buffer with pre-allocated buffers to ensure that the pooled allocator can be used + // for allocating the buffers + ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT; + int chunkSize; + if (byteBufAllocator instanceof PooledByteBufAllocator) { + PooledByteBufAllocator pooledByteBufAllocator = (PooledByteBufAllocator) byteBufAllocator; + chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(), DEFAULT_INITIAL_BUFFER_SIZE); + } else { + chunkSize = DEFAULT_INITIAL_BUFFER_SIZE; + } + CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer( + Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize / chunkSize) + 1)); + int totalLen = 0; + while (totalLen < initialBufferSize) { + totalLen += chunkSize; + buf.addComponent(false, byteBufAllocator.directBuffer(chunkSize)); + } + return buf; + } + private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextOutputStream stream) { String clusterName = pulsar.getConfiguration().getClusterName(); // generate managedLedgerCache metrics @@ -269,12 +249,13 @@ private static void parseMetricsToPrometheusMetrics(Collection metrics, String name = key.substring(0, nameIndex); value = key.substring(nameIndex + 1); if (!names.contains(name)) { - stream.write("# TYPE ").write(name.replace("brk_", "pulsar_")).write(' ') - .write(getTypeStr(metricType)).write("\n"); + stream.write("# TYPE "); + writeNameReplacingBrkPrefix(stream, name); + stream.write(' ').write(getTypeStr(metricType)).write("\n"); names.add(name); } - stream.write(name.replace("brk_", "pulsar_")) - .write("{cluster=\"").write(cluster).write('"'); + writeNameReplacingBrkPrefix(stream, name); + stream.write("{cluster=\"").write(cluster).write('"'); } catch (Exception e) { continue; } @@ -283,12 +264,13 @@ private static void parseMetricsToPrometheusMetrics(Collection metrics, String name = entry.getKey(); if (!names.contains(name)) { - stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ') - .write(getTypeStr(metricType)).write('\n'); + stream.write("# TYPE "); + writeNameReplacingBrkPrefix(stream, entry.getKey()); + stream.write(' ').write(getTypeStr(metricType)).write('\n'); names.add(name); } - stream.write(name.replace("brk_", "pulsar_")) - .write("{cluster=\"").write(cluster).write('"'); + writeNameReplacingBrkPrefix(stream, name); + stream.write("{cluster=\"").write(cluster).write('"'); } //to avoid quantile label duplicated @@ -308,18 +290,98 @@ private static void parseMetricsToPrometheusMetrics(Collection metrics, } } + private static SimpleTextOutputStream writeNameReplacingBrkPrefix(SimpleTextOutputStream stream, String name) { + if (name.startsWith("brk_")) { + return stream.write("pulsar_").write(CharBuffer.wrap(name).position("brk_".length())); + } else { + return stream.write(name); + } + } + private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsar, SimpleTextOutputStream stream) { StatsProvider statsProvider = pulsar.getManagedLedgerClientFactory().getStatsProvider(); if (statsProvider instanceof NullStatsProvider) { return; } - try { - Writer writer = new StringWriter(); + try (Writer writer = new OutputStreamWriter(new BufferedOutputStream(new OutputStream() { + @Override + public void write(int b) throws IOException { + stream.writeByte(b); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + stream.write(b, off, len); + } + }), StandardCharsets.UTF_8)) { statsProvider.writeAllMetrics(writer); - stream.write(writer.toString()); } catch (IOException e) { - // nop + log.error("Failed to write managed ledger bookie client metrics", e); + } + } + + public MetricsBuffer renderToBuffer(Executor executor, List metricsProviders) { + boolean cacheMetricsResponse = pulsar.getConfiguration().isMetricsBufferResponse(); + while (!closed && !Thread.currentThread().isInterrupted()) { + long currentTimeSlot = cacheMetricsResponse ? calculateCurrentTimeSlot() : 0; + MetricsBuffer currentMetricsBuffer = metricsBuffer; + if (currentMetricsBuffer == null || currentMetricsBuffer.getBufferFuture().isCompletedExceptionally() + || (currentMetricsBuffer.getBufferFuture().isDone() + && (currentMetricsBuffer.getCreateTimeslot() != 0 + && currentTimeSlot > currentMetricsBuffer.getCreateTimeslot()))) { + MetricsBuffer newMetricsBuffer = new MetricsBuffer(currentTimeSlot); + if (metricsBufferFieldUpdater.compareAndSet(this, currentMetricsBuffer, newMetricsBuffer)) { + if (currentMetricsBuffer != null) { + currentMetricsBuffer.release(); + } + CompletableFuture bufferFuture = newMetricsBuffer.getBufferFuture(); + executor.execute(() -> { + try { + bufferFuture.complete(generate0(metricsProviders)); + } catch (Exception e) { + bufferFuture.completeExceptionally(e); + } finally { + if (currentTimeSlot == 0) { + // if the buffer is not cached, release it after the future is completed + metricsBufferFieldUpdater.compareAndSet(this, newMetricsBuffer, null); + newMetricsBuffer.release(); + } + } + }); + // no need to retain before returning since the new buffer starts with refCnt 2 + return newMetricsBuffer; + } else { + currentMetricsBuffer = metricsBuffer; + } + } + // retain the buffer before returning + // if the buffer is already released, retaining won't succeed, retry in that case + if (currentMetricsBuffer != null && currentMetricsBuffer.retain()) { + return currentMetricsBuffer; + } + } + return null; + } + + /** + * Calculate the current time slot based on the current time. + * This is to ensure that cached metrics are refreshed consistently at a fixed interval regardless of the request + * time. + */ + private long calculateCurrentTimeSlot() { + long cacheTimeoutMillis = + TimeUnit.SECONDS.toMillis(Math.max(1, pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds())); + long now = clock.millis(); + return now / cacheTimeoutMillis; + } + + @Override + public void close() { + closed = true; + MetricsBuffer buffer = metricsBufferFieldUpdater.getAndSet(this, null); + if (buffer != null) { + buffer.release(); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java index 42bd2652883b6..7fcc74e965c24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java @@ -18,34 +18,142 @@ */ package org.apache.pulsar.broker.stats.prometheus; +import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.eclipse.jetty.server.HttpOutput; +@Slf4j public class PulsarPrometheusMetricsServlet extends PrometheusMetricsServlet { - private static final long serialVersionUID = 1L; + private static final int EXECUTOR_MAX_THREADS = 4; - private final PulsarService pulsar; - private final boolean shouldExportTopicMetrics; - private final boolean shouldExportConsumerMetrics; - private final boolean shouldExportProducerMetrics; - private final boolean splitTopicAndPartitionLabel; + private final PrometheusMetricsGenerator prometheusMetricsGenerator; public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, - boolean includeConsumerMetrics, boolean shouldExportProducerMetrics, + boolean includeConsumerMetrics, boolean includeProducerMetrics, boolean splitTopicAndPartitionLabel) { - super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), pulsar.getConfiguration().getClusterName()); - this.pulsar = pulsar; - this.shouldExportTopicMetrics = includeTopicMetrics; - this.shouldExportConsumerMetrics = includeConsumerMetrics; - this.shouldExportProducerMetrics = shouldExportProducerMetrics; - this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel; + super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), pulsar.getConfiguration().getClusterName(), + EXECUTOR_MAX_THREADS); + MetricsExports.initialize(); + prometheusMetricsGenerator = + new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, includeConsumerMetrics, + includeProducerMetrics, splitTopicAndPartitionLabel, Clock.systemUTC()); } + @Override - protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException { - PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, - shouldExportProducerMetrics, splitTopicAndPartitionLabel, outputStream, metricsProviders); + public void destroy() { + super.destroy(); + prometheusMetricsGenerator.close(); + } + + protected void doGet(HttpServletRequest request, HttpServletResponse response) { + AsyncContext context = request.startAsync(); + // set hard timeout to 2 * timeout + if (metricsServletTimeoutMs > 0) { + context.setTimeout(metricsServletTimeoutMs * 2); + } + long startNanos = System.nanoTime(); + AtomicBoolean skipWritingResponse = new AtomicBoolean(false); + context.addListener(new AsyncListener() { + @Override + public void onComplete(AsyncEvent event) throws IOException { + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + log.warn("Prometheus metrics request timed out"); + skipWritingResponse.set(true); + HttpServletResponse res = (HttpServletResponse) context.getResponse(); + if (!res.isCommitted()) { + res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + } + context.complete(); + } + + @Override + public void onError(AsyncEvent event) throws IOException { + skipWritingResponse.set(true); + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException { + } + }); + PrometheusMetricsGenerator.MetricsBuffer metricsBuffer = + prometheusMetricsGenerator.renderToBuffer(executor, metricsProviders); + if (metricsBuffer == null) { + log.info("Service is closing, skip writing metrics."); + response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + context.complete(); + return; + } + metricsBuffer.getBufferFuture().whenComplete((buffer, ex) -> executor.execute(() -> { + try { + long elapsedNanos = System.nanoTime() - startNanos; + // check if the request has been timed out, implement a soft timeout + // so that response writing can continue to up to 2 * timeout + if (metricsServletTimeoutMs > 0 && elapsedNanos > TimeUnit.MILLISECONDS.toNanos( + metricsServletTimeoutMs)) { + log.warn("Prometheus metrics request was too long in queue ({}ms). Skipping sending metrics.", + TimeUnit.NANOSECONDS.toMillis(elapsedNanos)); + if (!response.isCommitted() && !skipWritingResponse.get()) { + response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + } + return; + } + if (skipWritingResponse.get()) { + log.warn("Response has timed or failed, skip writing metrics."); + return; + } + if (response.isCommitted()) { + log.warn("Response is already committed, cannot write metrics"); + return; + } + if (ex != null) { + log.error("Failed to generate metrics", ex); + response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + return; + } + if (buffer == null) { + log.error("Failed to generate metrics, buffer is null"); + response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); + } else { + response.setStatus(HTTP_STATUS_OK_200); + response.setContentType("text/plain;charset=utf-8"); + ServletOutputStream outputStream = response.getOutputStream(); + if (outputStream instanceof HttpOutput) { + HttpOutput output = (HttpOutput) outputStream; + for (ByteBuffer nioBuffer : buffer.nioBuffers()) { + output.write(nioBuffer); + } + } else { + int length = buffer.readableBytes(); + if (length > 0) { + buffer.duplicate().readBytes(outputStream, length); + } + } + } + } catch (EOFException e) { + log.error("Failed to write metrics to response due to EOFException"); + } catch (IOException e) { + log.error("Failed to write metrics to response", e); + } finally { + metricsBuffer.release(); + context.complete(); + } + })); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 4be006423f509..27288291d2969 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -507,7 +507,9 @@ private static void writeConsumerMetric(PrometheusMetricStreams stream, String m static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster, String namespace, String topic, boolean splitTopicAndPartitionIndexLabel, String... extraLabelsAndValues) { - String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel ? 8 : 6]; + int baseLabelCount = splitTopicAndPartitionIndexLabel ? 8 : 6; + String[] labelsAndValues = + new String[baseLabelCount + (extraLabelsAndValues != null ? extraLabelsAndValues.length : 0)]; labelsAndValues[0] = "cluster"; labelsAndValues[1] = cluster; labelsAndValues[2] = "namespace"; @@ -527,7 +529,11 @@ static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, } else { labelsAndValues[5] = topic; } - String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues); - stream.writeSample(metricName, value, labels); + if (extraLabelsAndValues != null) { + for (int i = 0; i < extraLabelsAndValues.length; i++) { + labelsAndValues[baseLabelCount + i] = extraLabelsAndValues[i]; + } + } + stream.writeSample(metricName, value, labelsAndValues); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java new file mode 100644 index 0000000000000..fcc3b6aa88fb4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar; + +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; +import org.eclipse.jetty.server.HttpOutput; + +public class PrometheusMetricsTestUtil { + public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, + boolean includeProducerMetrics, OutputStream out) throws IOException { + generate(new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, includeConsumerMetrics, + includeProducerMetrics, false, Clock.systemUTC()), out, null); + } + + public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, + OutputStream out) throws IOException { + generate(new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, includeConsumerMetrics, + includeProducerMetrics, splitTopicAndPartitionIndexLabel, Clock.systemUTC()), out, null); + } + + public static void generate(PrometheusMetricsGenerator metricsGenerator, OutputStream out, + List metricsProviders) throws IOException { + PrometheusMetricsGenerator.MetricsBuffer metricsBuffer = + metricsGenerator.renderToBuffer(MoreExecutors.directExecutor(), metricsProviders); + try { + ByteBuf buffer = null; + try { + buffer = metricsBuffer.getBufferFuture().get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (ExecutionException | TimeoutException e) { + throw new IOException(e); + } + if (buffer == null) { + return; + } + if (out instanceof HttpOutput) { + HttpOutput output = (HttpOutput) out; + ByteBuffer[] nioBuffers = buffer.nioBuffers(); + for (ByteBuffer nioBuffer : nioBuffers) { + output.write(nioBuffer); + } + } else { + int length = buffer.readableBytes(); + if (length > 0) { + buffer.duplicate().readBytes(out, length); + } + } + } finally { + metricsBuffer.release(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java index 8be0aa4bc7dbd..ff8e418c024a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java @@ -40,10 +40,10 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.service.Dispatcher; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -218,7 +218,7 @@ public void testBucketDelayedIndexMetrics() throws Exception { Thread.sleep(2000); ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, true, true, output); + PrometheusMetricsTestUtil.generate(pulsar, true, true, true, output); String metricsStr = output.toString(StandardCharsets.UTF_8); Multimap metricsMap = parseMetrics(metricsStr); @@ -304,7 +304,7 @@ public void testBucketDelayedIndexMetrics() throws Exception { assertEquals(opLatencyMetricsSum.intValue(), opLatencyTopicMetrics.get().value); ByteArrayOutputStream namespaceOutput = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, true, true, namespaceOutput); + PrometheusMetricsTestUtil.generate(pulsar, false, true, true, namespaceOutput); Multimap namespaceMetricsMap = parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8)); Optional namespaceMetric = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c214634e6ed32..44d24668cc381 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -65,11 +65,11 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -367,7 +367,7 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex latch.await(10, TimeUnit.SECONDS); ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output); + PrometheusMetricsTestUtil.generate(pulsar, exposeTopicLevelMetrics, true, true, output); String metricsStr = output.toString(StandardCharsets.UTF_8); Multimap metricsMap = parseMetrics(metricsStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index a520b8c241bd1..3a4016eb79c21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -43,11 +43,11 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.naming.TopicName; @@ -121,7 +121,7 @@ public void testSchemaRegistryMetrics() throws Exception { deleteSchema(schemaId, version(1)); ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, output); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, output); output.flush(); String metricsStr = output.toString(StandardCharsets.UTF_8); Multimap metrics = parseMetrics(metricsStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index eb4500c13667a..512a5cfcab661 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -45,6 +45,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -52,7 +53,6 @@ import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest; import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -335,7 +335,7 @@ private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevel consumer2.updateRates(); ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output); + PrometheusMetricsTestUtil.generate(pulsar, exposeTopicLevelMetrics, true, true, output); String metricStr = output.toString(StandardCharsets.UTF_8); Multimap metricsMap = parseMetrics(metricStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java index 15f41365da8d1..726bde3f3d0a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java @@ -30,10 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -101,7 +101,7 @@ public void testMetadataStoreStats() throws Exception { } ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, false, output); String metricsStr = output.toString(); Multimap metricsMap = parseMetrics(metricsStr); @@ -191,7 +191,7 @@ public void testBatchMetadataStoreMetrics() throws Exception { } ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, false, output); String metricsStr = output.toString(); Multimap metricsMap = parseMetrics(metricsStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index d3891931496c5..1fe0e99b49874 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -21,7 +21,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -35,6 +38,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -51,6 +55,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -61,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -85,7 +91,6 @@ import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.zookeeper.CreateMode; import org.awaitility.Awaitility; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -155,7 +160,7 @@ public void testPublishRateLimitedTimes() throws Exception { }); @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times")); @@ -185,7 +190,7 @@ public void testPublishRateLimitedTimes() throws Exception { @Cleanup ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut2); String metricsStr2 = statsOut2.toString(); Multimap metrics2 = parseMetrics(metricsStr2); assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times")); @@ -217,7 +222,7 @@ public void testMetricsTopicCount() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); Collection metric = metrics.get("pulsar_topics_count"); @@ -254,7 +259,7 @@ public void testMetricsAvgMsgSize2() throws Exception { producerInServer.getStats().msgThroughputIn = 100; @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, true, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, true, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); assertTrue(metrics.containsKey("pulsar_average_msg_size")); @@ -297,7 +302,7 @@ public void testPerTopicStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -395,7 +400,7 @@ public void testPerBrokerStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -504,7 +509,7 @@ public void testPerTopicStatsReconnect() throws Exception { c2.close(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -582,7 +587,7 @@ public void testStorageReadCacheMissesRate(boolean cacheEnable) throws Exception // includeTopicMetric true ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -614,7 +619,7 @@ public void testStorageReadCacheMissesRate(boolean cacheEnable) throws Exception // includeTopicMetric false ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut2); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut2); String metricsStr2 = statsOut2.toString(); Multimap metrics2 = parseMetrics(metricsStr2); @@ -698,7 +703,7 @@ public void testPerTopicExpiredStat() throws Exception { Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); // There should be 2 metrics with different tags for each topic @@ -780,15 +785,15 @@ public void testBundlesMetrics() throws Exception { for (var latencyMetric : UnloadManager.LatencyMetric.values()) { var serviceUnit = "serviceUnit"; var brokerLookupAddress = "lookupAddress"; - var serviceUnitStateData = Mockito.mock(ServiceUnitStateData.class); - Mockito.when(serviceUnitStateData.sourceBroker()).thenReturn(brokerLookupAddress); - Mockito.when(serviceUnitStateData.dstBroker()).thenReturn(brokerLookupAddress); + var serviceUnitStateData = mock(ServiceUnitStateData.class); + when(serviceUnitStateData.sourceBroker()).thenReturn(brokerLookupAddress); + when(serviceUnitStateData.dstBroker()).thenReturn(brokerLookupAddress); latencyMetric.beginMeasurement(serviceUnit, brokerLookupAddress, serviceUnitStateData); latencyMetric.endMeasurement(serviceUnit); } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in")); @@ -838,7 +843,7 @@ public void testNonPersistentSubMetrics() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); assertTrue(metrics.containsKey("pulsar_subscription_back_log")); @@ -885,7 +890,7 @@ public void testPerNamespaceStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -958,7 +963,7 @@ public void testPerProducerStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, true, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, true, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -1026,7 +1031,7 @@ public void testPerConsumerStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, true, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, true, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -1113,7 +1118,7 @@ public void testDuplicateMetricTypeDefinitions() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Map typeDefs = new HashMap<>(); @@ -1217,7 +1222,7 @@ public void testManagedLedgerCacheStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -1253,7 +1258,7 @@ public void testManagedLedgerStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -1331,7 +1336,7 @@ public void testManagedLedgerBookieClientStats() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -1412,7 +1417,7 @@ public String getCommandData() { }); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); List cm = (List) metrics.get("pulsar_authentication_success_total"); @@ -1473,7 +1478,7 @@ public String getCommandData() { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); List cm = (List) metrics.get("pulsar_expired_token_total"); @@ -1514,7 +1519,7 @@ public String getCommandData() { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); Metric countMetric = ((List) metrics.get("pulsar_expiring_token_minutes_count")).get(0); @@ -1588,7 +1593,7 @@ public void testManagedCursorPersistStats() throws Exception { // enable ExposeManagedCursorMetricsInPrometheus pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(true); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -1601,7 +1606,7 @@ public void testManagedCursorPersistStats() throws Exception { // disable ExposeManagedCursorMetricsInPrometheus pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(false); ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut2); String metricsStr2 = statsOut2.toString(); Multimap metrics2 = parseMetrics(metricsStr2); List cm2 = (List) metrics2.get("pulsar_ml_cursor_persistLedgerSucceed"); @@ -1620,7 +1625,7 @@ public void testBrokerConnection() throws Exception { .create(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); List cm = (List) metrics.get("pulsar_connection_created_total_count"); @@ -1637,7 +1642,7 @@ public void testBrokerConnection() throws Exception { pulsarClient.close(); statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); metricsStr = statsOut.toString(); metrics = parseMetrics(metricsStr); @@ -1660,7 +1665,7 @@ public void testBrokerConnection() throws Exception { pulsarClient.close(); statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); metricsStr = statsOut.toString(); metrics = parseMetrics(metricsStr); @@ -1704,7 +1709,7 @@ public void testCompaction() throws Exception { .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); List cm = (List) metrics.get("pulsar_compaction_removed_event_count"); @@ -1739,7 +1744,7 @@ public void testCompaction() throws Exception { Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor(); compactor.compact(topicName).get(); statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); metricsStr = statsOut.toString(); metrics = parseMetrics(metricsStr); cm = (List) metrics.get("pulsar_compaction_removed_event_count"); @@ -1772,31 +1777,36 @@ public void testCompaction() throws Exception { @Test public void testMetricsWithCache() throws Throwable { - ServiceConfiguration configuration = Mockito.mock(ServiceConfiguration.class); - Mockito.when(configuration.getManagedLedgerStatsPeriodSeconds()).thenReturn(2); - Mockito.when(configuration.isMetricsBufferResponse()).thenReturn(true); - Mockito.when(configuration.getClusterName()).thenReturn(configClusterName); - Mockito.when(pulsar.getConfiguration()).thenReturn(configuration); + ServiceConfiguration configuration = pulsar.getConfiguration(); + configuration.setManagedLedgerStatsPeriodSeconds(2); + configuration.setMetricsBufferResponse(true); + configuration.setClusterName(configClusterName); - int period = pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds(); - TimeWindow timeWindow = new TimeWindow<>(2, (int) TimeUnit.SECONDS.toMillis(period)); + // create a mock clock to control the time + AtomicLong currentTimeMillis = new AtomicLong(System.currentTimeMillis()); + Clock clock = mock(); + when(clock.millis()).thenAnswer(invocation -> currentTimeMillis.get()); + PrometheusMetricsGenerator prometheusMetricsGenerator = + new PrometheusMetricsGenerator(pulsar, true, false, false, + false, clock); + + String previousMetrics = null; for (int a = 0; a < 4; a++) { - long start = System.currentTimeMillis(); ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, false, statsOut1, null); + PrometheusMetricsTestUtil.generate(prometheusMetricsGenerator, statsOut1, null); ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, false, statsOut2, null); - long end = System.currentTimeMillis(); - - if (timeWindow.currentWindowStart(start) == timeWindow.currentWindowStart(end)) { - String metricsStr1 = statsOut1.toString(); - String metricsStr2 = statsOut2.toString(); - assertEquals(metricsStr1, metricsStr2); - Multimap metrics = parseMetrics(metricsStr1); - } + PrometheusMetricsTestUtil.generate(prometheusMetricsGenerator, statsOut2, null); + + String metricsStr1 = statsOut1.toString(); + String metricsStr2 = statsOut2.toString(); + assertTrue(metricsStr1.length() > 1000); + assertEquals(metricsStr1, metricsStr2); + assertNotEquals(metricsStr1, previousMetrics); + previousMetrics = metricsStr1; - Thread.sleep(TimeUnit.SECONDS.toMillis(period / 2)); + // move time forward + currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2)); } } @@ -1824,7 +1834,7 @@ public void testSplitTopicAndPartitionLabel() throws Exception { .subscribe(); @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, true, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, true, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); Collection metric = metrics.get("pulsar_consumers_count"); @@ -1860,7 +1870,7 @@ public void testMetricsGroupedByTypeDefinitions() throws Exception { } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)"); @@ -1920,7 +1930,7 @@ public void testEscapeLabelValue() throws Exception { .subscribe(); @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); final List subCountLines = metricsStr.lines() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index e39860274d12f..3e71d8f211101 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -31,13 +31,13 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.EntryFilterSupport; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.plugin.EntryFilterTest; import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -233,7 +233,7 @@ public void testSubscriptionStats(final String topic, final String subName, bool } ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output); + PrometheusMetricsTestUtil.generate(pulsar, enableTopicStats, false, false, output); String metricsStr = output.toString(); Multimap metrics = parseMetrics(metricsStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java deleted file mode 100644 index 89528c1965397..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.stats; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import org.testng.annotations.Test; - -public class TimeWindowTest { - - @Test - public void windowTest() throws Exception { - int intervalInMs = 1000; - int sampleCount = 2; - TimeWindow timeWindow = new TimeWindow<>(sampleCount, intervalInMs); - - WindowWrap expect1 = timeWindow.current(oldValue -> 1); - WindowWrap expect2 = timeWindow.current(oldValue -> null); - assertNotNull(expect1); - assertNotNull(expect2); - - if (expect1.start() == expect2.start()) { - assertEquals((int) expect1.value(), 1); - assertEquals(expect1, expect2); - assertEquals(expect1.value(), expect2.value()); - } - - Thread.sleep(intervalInMs); - - WindowWrap expect3 = timeWindow.current(oldValue -> 2); - WindowWrap expect4 = timeWindow.current(oldValue -> null); - assertNotNull(expect3); - assertNotNull(expect4); - - if (expect3.start() == expect4.start()) { - assertEquals((int) expect3.value(), 2); - assertEquals(expect3, expect4); - assertEquals(expect3.value(), expect4.value()); - } - - Thread.sleep(intervalInMs); - - WindowWrap expect5 = timeWindow.current(oldValue -> 3); - WindowWrap expect6 = timeWindow.current(oldValue -> null); - assertNotNull(expect5); - assertNotNull(expect6); - - if (expect5.start() == expect6.start()) { - assertEquals((int) expect5.value(), 3); - assertEquals(expect5, expect6); - assertEquals(expect5.value(), expect6.value()); - } - - Thread.sleep(intervalInMs); - - WindowWrap expect7 = timeWindow.current(oldValue -> 4); - WindowWrap expect8 = timeWindow.current(oldValue -> null); - assertNotNull(expect7); - assertNotNull(expect8); - - if (expect7.start() == expect8.start()) { - assertEquals((int) expect7.value(), 4); - assertEquals(expect7, expect8); - assertEquals(expect7.value(), expect8.value()); - } - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index 723a493eca1df..8d5cb9dc39148 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -38,9 +38,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -118,7 +118,7 @@ public void testTransactionCoordinatorMetrics() throws Exception { pulsar.getTransactionMetadataStoreService().getStores() .get(transactionCoordinatorIDTwo).newTransaction(timeout, null).get(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); Collection metric = metrics.get("pulsar_txn_active_count"); @@ -186,7 +186,7 @@ public void testTransactionCoordinatorRateMetrics() throws Exception { pulsar.getBrokerService().updateRates(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -216,7 +216,7 @@ public void testTransactionCoordinatorRateMetrics() throws Exception { }); statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); metricsStr = statsOut.toString(); metrics = parseMetrics(metricsStr); @@ -272,7 +272,7 @@ public void testManagedLedgerMetrics() throws Exception { producer.send("hello pulsar".getBytes()); consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -290,7 +290,7 @@ public void testManagedLedgerMetrics() throws Exception { checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric); statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); metricsStr = statsOut.toString(); metrics = parseMetrics(metricsStr); metric = metrics.get("pulsar_storage_size"); @@ -334,7 +334,7 @@ public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception { producer.send("hello pulsar".getBytes()); consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); @@ -359,7 +359,7 @@ public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception { consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); metricsStr = statsOut.toString(); metrics = parseMetrics(metricsStr); metric = metrics.get("pulsar_storage_size"); @@ -393,7 +393,7 @@ public void testDuplicateMetricTypeDefinitions() throws Exception { .send(); } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Map typeDefs = new HashMap<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index be036a0cf590b..1c3de777e9349 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -46,9 +46,9 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl; @@ -229,7 +229,7 @@ public void testTransactionBufferMetrics() throws Exception { @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsarServiceList.get(0), true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metricsMap = parseMetrics(metricsStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 6c24b6b3f0151..db9daf56104c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -46,12 +46,12 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.map.LinkedMap; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; @@ -255,7 +255,7 @@ public void testPendingAckMetrics() throws Exception { @Cleanup ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsarServiceList.get(0), true, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metricsMap = parseMetrics(metricsStr); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index d2b59ed0e4997..17588a7ecac8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -55,9 +55,9 @@ import javax.net.ssl.TrustManager; import lombok.Cleanup; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; @@ -107,7 +107,7 @@ public class WebServiceTest { public void testWebExecutorMetrics() throws Exception { setupEnv(true, false, false, false, -1, false); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); Multimap metrics = parseMetrics(metricsStr); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java index c8c639606aa3e..9bf6302f50f02 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; +import java.nio.CharBuffer; /** * Format strings and numbers into a ByteBuf without any memory allocation. @@ -28,6 +29,7 @@ public class SimpleTextOutputStream { private final ByteBuf buffer; private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + private final CharBuffer singleCharBuffer = CharBuffer.allocate(1); public SimpleTextOutputStream(ByteBuf buffer) { this.buffer = buffer; @@ -44,11 +46,17 @@ public SimpleTextOutputStream write(byte[] a, int offset, int len) { } public SimpleTextOutputStream write(char c) { - write(String.valueOf(c)); + // In UTF-8, any character from U+0000 to U+007F is encoded in one byte + if (c <= '\u007F') { + buffer.writeByte((byte) c); + return this; + } + singleCharBuffer.put(0, c); + buffer.writeCharSequence(singleCharBuffer, CharsetUtil.UTF_8); return this; } - public SimpleTextOutputStream write(String s) { + public SimpleTextOutputStream write(CharSequence s) { if (s == null) { return this; } @@ -136,4 +144,8 @@ public void write(ByteBuf byteBuf) { public ByteBuf getBuffer() { return buffer; } + + public void writeByte(int b) { + buffer.writeByte(b); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index db2969e3c3920..39c8fb5e086fd 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -392,6 +392,12 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private boolean authenticateMetricsEndpoint = true; + @FieldContext( + category = CATEGORY_HTTP, + doc = "Time in milliseconds that metrics endpoint would time out. Default is 30s.\n" + + " Set it to 0 to disable timeout." + ) + private long metricsServletTimeoutMs = 30000; @FieldContext( category = CATEGORY_SASL_AUTH, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 61b00871cecdb..ea9e4ebfaa9b8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -299,7 +299,8 @@ public void start() throws Exception { } private synchronized void createMetricsServlet() { - this.metricsServlet = new PrometheusMetricsServlet(-1L, proxyConfig.getClusterName()); + this.metricsServlet = + new PrometheusMetricsServlet(proxyConfig.getMetricsServletTimeoutMs(), proxyConfig.getClusterName()); if (pendingMetricsProviders != null) { pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); this.pendingMetricsProviders = null; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 72d54601995f1..50a8e3ab7d753 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import com.google.common.annotations.VisibleForTesting; +import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.Gauge.Child; @@ -236,21 +237,36 @@ public void start() throws Exception { if (!metricsInitialized) { // Setup metrics DefaultExports.initialize(); + CollectorRegistry registry = CollectorRegistry.defaultRegistry; // Report direct memory from Netty counters - Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() { - @Override - public double get() { - return getJvmDirectMemoryUsed(); - } - }).register(CollectorRegistry.defaultRegistry); + Collector jvmMemoryDirectBytesUsed = + Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() { + @Override + public double get() { + return getJvmDirectMemoryUsed(); + } + }); + try { + registry.register(jvmMemoryDirectBytesUsed); + } catch (IllegalArgumentException e) { + // workaround issue in tests where the metric is already registered + log.debug("Failed to register jvm_memory_direct_bytes_used metric: {}", e.getMessage()); + } - Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() { - @Override - public double get() { - return DirectMemoryUtils.jvmMaxDirectMemory(); - } - }).register(CollectorRegistry.defaultRegistry); + Collector jvmMemoryDirectBytesMax = + Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() { + @Override + public double get() { + return DirectMemoryUtils.jvmMaxDirectMemory(); + } + }); + try { + registry.register(jvmMemoryDirectBytesMax); + } catch (IllegalArgumentException e) { + // workaround issue in tests where the metric is already registered + log.debug("Failed to register jvm_memory_direct_bytes_max metric: {}", e.getMessage()); + } metricsInitialized = true; }