From 7efb58f3211cdc3bd0524a79f9b5ce4ef74a77b4 Mon Sep 17 00:00:00 2001 From: dujian0068 <1426703092@qq.com> Date: Wed, 24 Jul 2024 04:09:25 +0800 Subject: [PATCH] KAFKA-16584 Make log processing summary configurable or debug (#16509) KAFKA-16584 Make log processing summary configurable or debug Reviewers: Matthias Sax , Bill Bejeck --- docs/streams/developer-guide/config-streams.html | 16 ++++++++++++++++ .../org/apache/kafka/streams/StreamsConfig.java | 11 ++++++++++- .../processor/internals/StreamThread.java | 9 ++++----- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 92a44e0be8c5f..f50945778a160 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -93,6 +93,7 @@
  • task.assignor.class
  • topology.optimization
  • windowed.inner.class.serde
  • +
  • log.summary.interval.ms
  • Kafka consumers and producer configuration parameters @@ -470,6 +471,11 @@

    num.standby.replicasSets window size for the deserializer in order to calculate window end times. null + log.summary.interval.ms + Low + Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. + 120000milliseconds (2 minutes) +
    @@ -1066,6 +1072,16 @@

    windowed.inner.class.serde +

    log.summary.interval.ms

    +
    +
    + This configuration controls the output interval for summary information. + If greater or equal to 0, the summary log will be output according to the set time interval; + If less than 0, summary output is disabled. +
    +
    +

    upgrade.from

    diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 4c2c2d18a2ead..eab567d525d02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -834,6 +834,10 @@ public class StreamsConfig extends AbstractConfig { private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the " + TaskAssignor.class.getName() + " interface. Defaults to the HighAvailabilityTaskAssignor class."; + public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms"; + private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "This configuration controls the output interval for summary information.\n" + + "If greater or equal to 0, the summary log will be output according to the set time interval;\n" + + "If less than 0, summary output is disabled."; /** * {@code topology.optimization} * @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead @@ -1206,7 +1210,12 @@ public class StreamsConfig extends AbstractConfig { Type.LONG, null, Importance.LOW, - WINDOW_SIZE_MS_DOC); + WINDOW_SIZE_MS_DOC) + .define(LOG_SUMMARY_INTERVAL_MS_CONFIG, + Type.LONG, + 2 * 60 * 1000L, + Importance.LOW, + LOG_SUMMARY_INTERVAL_MS_DOC); } // this is the list of configs for underlying clients diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 72e38821ff6c9..05c832811adc4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -303,7 +303,7 @@ public boolean isStartingRunningOrPartitionAssigned() { private final Sensor commitRatioSensor; private final Sensor failedStreamThreadSensor; - private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log a summary of processing every 2 minutes + private final long logSummaryIntervalMs; // the count summary log output time interval private long lastLogSummaryMs = -1L; private long totalRecordsProcessedSinceLastSummary = 0L; private long totalPunctuatorsSinceLastSummary = 0L; @@ -643,6 +643,7 @@ public StreamThread(final Time time, this.processingMode = processingMode(config); this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); this.processingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); + this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG); } private static final class InternalConsumerConfig extends ConsumerConfig { @@ -1069,8 +1070,7 @@ void runOnceWithoutProcessingThreads() { pollRatioSensor.record((double) pollLatency / runOnceLatency, now); commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); - final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS; - if (logProcessingSummary) { + if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) { log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks since the last update", totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary); @@ -1142,8 +1142,7 @@ void runOnceWithProcessingThreads() { pollRatioSensor.record((double) pollLatency / runOnceLatency, now); commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); - final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS; - if (logProcessingSummary) { + if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) { log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary); totalCommittedSinceLastSummary = 0L;