Skip to content

Commit

Permalink
KAFKA-16584 Make log processing summary configurable or debug (apache…
Browse files Browse the repository at this point in the history
…#16509)

KAFKA-16584 Make log processing summary configurable or debug

Reviewers: Matthias Sax <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
dujian0068 authored Jul 23, 2024
1 parent d43806c commit 7efb58f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
16 changes: 16 additions & 0 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
Expand Down Expand Up @@ -470,6 +471,11 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>log.summary.interval.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>120000milliseconds (2 minutes)</td>
</tr>
</tbody>
</table>
<div class="section" id="acceptable-recovery-lag">
Expand Down Expand Up @@ -1066,6 +1072,16 @@ <h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="
</p>
</div></blockquote>
</div>
<div class="section" id="log-summary-interval-ms">
<h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a class="headerlink" href="#log-summary-interval-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
This configuration controls the output interval for summary information.
If greater or equal to 0, the summary log will be output according to the set time interval;
If less than 0, summary output is disabled.
</div>
</blockquote>
</div>
<div class="section" id="upgrade-from">
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
<blockquote>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,10 @@ public class StreamsConfig extends AbstractConfig {
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <code>" +
TaskAssignor.class.getName() + "</code> interface. Defaults to the <code>HighAvailabilityTaskAssignor</code> class.";

public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "This configuration controls the output interval for summary information.\n" +
"If greater or equal to 0, the summary log will be output according to the set time interval;\n" +
"If less than 0, summary output is disabled.";
/**
* {@code topology.optimization}
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
Expand Down Expand Up @@ -1206,7 +1210,12 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
null,
Importance.LOW,
WINDOW_SIZE_MS_DOC);
WINDOW_SIZE_MS_DOC)
.define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
Type.LONG,
2 * 60 * 1000L,
Importance.LOW,
LOG_SUMMARY_INTERVAL_MS_DOC);
}

// this is the list of configs for underlying clients
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final Sensor commitRatioSensor;
private final Sensor failedStreamThreadSensor;

private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log a summary of processing every 2 minutes
private final long logSummaryIntervalMs; // the count summary log output time interval
private long lastLogSummaryMs = -1L;
private long totalRecordsProcessedSinceLastSummary = 0L;
private long totalPunctuatorsSinceLastSummary = 0L;
Expand Down Expand Up @@ -643,6 +643,7 @@ public StreamThread(final Time time,
this.processingMode = processingMode(config);
this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals());
this.processingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals());
this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
}

private static final class InternalConsumerConfig extends ConsumerConfig {
Expand Down Expand Up @@ -1069,8 +1070,7 @@ void runOnceWithoutProcessingThreads() {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS;
if (logProcessingSummary) {
if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks since the last update",
totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);

Expand Down Expand Up @@ -1142,8 +1142,7 @@ void runOnceWithProcessingThreads() {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS;
if (logProcessingSummary) {
if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary);

totalCommittedSinceLastSummary = 0L;
Expand Down

0 comments on commit 7efb58f

Please sign in to comment.