This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 843f8fe321a KAFKA-20398: Fix memory leak by removing
StreamsThreadMetricsDelegatingReporter from metrics registry on thread shutdown
(#21973)
843f8fe321a is described below
commit 843f8fe321af3ce0351c1c45df314f9e5eec3b06
Author: sstremler <[email protected]>
AuthorDate: Tue Apr 7 21:00:07 2026 +0200
KAFKA-20398: Fix memory leak by removing
StreamsThreadMetricsDelegatingReporter from metrics registry on thread shutdown
(#21973)
When a stream thread in Kafka Streams crashes,
`StreamsUncaughtExceptionHandler` can be configured to replace the
crashed thread with a new one. Each time a new stream thread is created,
it adds a new `StreamsThreadMetricsDelegatingReporter` to it. However,
when the old thread shuts down, its reporter is never removed from the
registry implying "unbounded" memory growth.
This bug was introduced via https://github.com/apache/kafka/pull/17021
This PR ensures that the existing
`StreamsThreadMetricsDelegatingReporter` is removed when a thread is
shutting down.
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>
Signed-off-by: Szabolcs Stremler <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 17 +++++--
.../processor/internals/StreamThreadTest.java | 58 +++++++++++++++++++---
2 files changed, 65 insertions(+), 10 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 29724166e7a..833d42aeae4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -384,6 +384,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
private final WindowedSum punctuateLatencyWindowedSum = new WindowedSum();
private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum();
private final MetricConfig metricsConfig;
+ private final StreamsThreadMetricsDelegatingReporter metricsReporter;
private boolean latencyWindowsInitialized = false;
@@ -514,8 +515,8 @@ public class StreamThread extends Thread implements
ProcessingThread {
taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
- final StreamsThreadMetricsDelegatingReporter reporter = new
StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer,
threadId, Optional.of(stateUpdaterId));
- streamsMetrics.metricsRegistry().addReporter(reporter);
+ final StreamsThreadMetricsDelegatingReporter metricsReporter = new
StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer,
threadId, Optional.of(stateUpdaterId));
+ streamsMetrics.metricsRegistry().addReporter(metricsReporter);
final StreamThread streamThread = new StreamThread(
time,
@@ -539,7 +540,8 @@ public class StreamThread extends Thread implements
ProcessingThread {
streamsUncaughtExceptionHandler,
cache::resize,
mainConsumerSetup.streamsRebalanceData,
- streamsMetadataState
+ streamsMetadataState,
+ metricsReporter
);
return streamThread.updateThreadMetadata(adminClientId(clientId));
@@ -786,7 +788,8 @@ public class StreamThread extends Thread implements
ProcessingThread {
final BiConsumer<Throwable, Boolean>
streamsUncaughtExceptionHandler,
final java.util.function.Consumer<Long> cacheResizer,
final Optional<StreamsRebalanceData>
streamsRebalanceData,
- final StreamsMetadataState streamsMetadataState
+ final StreamsMetadataState streamsMetadataState,
+ final StreamsThreadMetricsDelegatingReporter
metricsReporter
) {
super(threadId);
this.stateLock = new Object();
@@ -809,6 +812,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.metricsConfig = streamsMetrics.metricsRegistry().config();
+ this.metricsReporter = metricsReporter;
// The following sensors are created here but their references are not
stored in this object, since within
// this object they are not recorded. The sensors are created here so
that the stream threads starts with all
@@ -1905,6 +1909,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
streamsMetrics.removeAllThreadLevelSensors(getName());
streamsMetrics.removeAllThreadLevelMetrics(getName());
+ streamsMetrics.metricsRegistry().removeReporter(metricsReporter);
setState(State.DEAD);
@@ -2102,6 +2107,10 @@ public class StreamThread extends Thread implements
ProcessingThread {
return streamsRebalanceData;
}
+ StreamsMetricsImpl streamsMetrics() {
+ return streamsMetrics;
+ }
+
/**
* Initialize both WindowedSum instances at exactly the same timestamp so
* their windows are aligned from the very beginning.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3ea49bc00ef..21e06b19cc1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.Serdes;
@@ -71,6 +72,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
+import
org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@@ -1449,6 +1451,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));
@@ -2474,6 +2477,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
) {
@Override
@@ -2535,6 +2539,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
) {
@Override
@@ -2604,6 +2609,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
) {
@Override
@@ -2670,6 +2676,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
) {
@Override
@@ -2733,6 +2740,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
) {
@Override
@@ -2965,6 +2973,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
);
final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<>());
@@ -3024,6 +3033,7 @@ public class StreamThreadTest {
(e, b) -> { },
null,
Optional.empty(),
+ null,
null
) {
@Override
@@ -3611,7 +3621,8 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- streamsMetadataState
+ streamsMetadataState,
+ null
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3672,7 +3683,8 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- streamsMetadataState
+ streamsMetadataState,
+ null
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3742,7 +3754,8 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- streamsMetadataState
+ streamsMetadataState,
+ null
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3803,7 +3816,8 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- streamsMetadataState
+ streamsMetadataState,
+ null
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3864,7 +3878,8 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- streamsMetadataState
+ streamsMetadataState,
+ null
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3934,7 +3949,8 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- streamsMetadataState
+ streamsMetadataState,
+ null
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3998,6 +4014,34 @@ public class StreamThreadTest {
);
}
+ @Test
+ public void shouldRemoveMetricsDelegatingReporterOnShutdown() throws
InterruptedException {
+ thread = createStreamThread(CLIENT_ID, false);
+
+ final List<MetricsReporter> reportersAfterCreate =
thread.streamsMetrics().metricsRegistry().reporters();
+ assertThat(
+ reportersAfterCreate.stream()
+ .filter(r -> r instanceof
StreamsThreadMetricsDelegatingReporter)
+ .count(),
+ equalTo(1L)
+ );
+
+ thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+ TestUtils.waitForCondition(
+ () -> thread.state() == StreamThread.State.DEAD,
+ 10 * 1000,
+ "Thread never shut down."
+ );
+
+ final List<MetricsReporter> reportersAfterShutdown =
thread.streamsMetrics().metricsRegistry().reporters();
+ assertThat(
+ reportersAfterShutdown.stream()
+ .filter(r -> r instanceof
StreamsThreadMetricsDelegatingReporter)
+ .count(),
+ equalTo(0L)
+ );
+ }
+
private StreamThread setUpThread(final Properties streamsConfigProps) {
final StreamsConfig config = new StreamsConfig(streamsConfigProps);
final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
@@ -4030,6 +4074,7 @@ public class StreamThreadTest {
null,
null,
Optional.empty(),
+ null,
null
);
}
@@ -4131,6 +4176,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.empty(),
+ null,
null
);
}