This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new ac44bba6556 KAFKA-20398: Fix memory leak by removing 
StreamsThreadMetricsDelegatingReporter from metrics registry on thread (#21989)
ac44bba6556 is described below

commit ac44bba6556e499248e2174b463892bfeaeb195a
Author: sstremler <[email protected]>
AuthorDate: Wed Apr 8 11:23:26 2026 +0200

    KAFKA-20398: Fix memory leak by removing 
StreamsThreadMetricsDelegatingReporter from metrics registry on thread (#21989)
    
    When a stream thread in Kafka Streams crashes,
    `StreamsUncaughtExceptionHandler` can be configured to replace the
    crashed thread with a new one. Each time a new stream thread is created,
    it adds a new `StreamsThreadMetricsDelegatingReporter` to it. However,
    when the old thread shuts down, its reporter is never removed from the
    registry implying "unbounded" memory growth.
    
    This bug was introduced via https://github.com/apache/kafka/pull/17021
    
    This PR ensures that the existing
    `StreamsThreadMetricsDelegatingReporter` is removed when a thread is
    shutting down.
    
    Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  | 17 +++++--
 .../processor/internals/StreamThreadTest.java      | 58 +++++++++++++++++++---
 2 files changed, 65 insertions(+), 10 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 41d8dbcc0ee..2f941130b56 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -375,6 +375,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private final boolean eosEnabled;
     private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
+    private final StreamsThreadMetricsDelegatingReporter metricsReporter;
 
     private volatile long fetchDeadlineClientInstanceId = -1;
     private volatile KafkaFutureImpl<Uuid> mainConsumerInstanceIdFuture = new 
KafkaFutureImpl<>();
@@ -511,8 +512,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
         referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
 
-        final StreamsThreadMetricsDelegatingReporter reporter = new 
StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer, 
threadId, Optional.of(stateUpdaterId));
-        streamsMetrics.metricsRegistry().addReporter(reporter);
+        final StreamsThreadMetricsDelegatingReporter metricsReporter = new 
StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer, 
threadId, Optional.of(stateUpdaterId));
+        streamsMetrics.metricsRegistry().addReporter(metricsReporter);
 
         final StreamThread streamThread = new StreamThread(
             time,
@@ -536,7 +537,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             streamsUncaughtExceptionHandler,
             cache::resize,
             mainConsumerSetup.streamsRebalanceData,
-            streamsMetadataState
+            streamsMetadataState,
+            metricsReporter
         );
 
         return streamThread.updateThreadMetadata(adminClientId(clientId));
@@ -785,7 +787,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                         final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler,
                         final java.util.function.Consumer<Long> cacheResizer,
                         final Optional<StreamsRebalanceData> 
streamsRebalanceData,
-                        final StreamsMetadataState streamsMetadataState
+                        final StreamsMetadataState streamsMetadataState,
+                        final StreamsThreadMetricsDelegatingReporter 
metricsReporter
                         ) {
         super(threadId);
         this.stateLock = new Object();
@@ -807,6 +810,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         this.shutdownErrorHook = shutdownErrorHook;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
         this.cacheResizer = cacheResizer;
+        this.metricsReporter = metricsReporter;
 
         // The following sensors are created here but their references are not 
stored in this object, since within
         // this object they are not recorded. The sensors are created here so 
that the stream threads starts with all
@@ -1964,6 +1968,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
         streamsMetrics.removeAllThreadLevelSensors(getName());
         streamsMetrics.removeAllThreadLevelMetrics(getName());
+        streamsMetrics.metricsRegistry().removeReporter(metricsReporter);
 
         setState(State.DEAD);
 
@@ -2170,4 +2175,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     Optional<StreamsRebalanceData> streamsRebalanceData() {
         return streamsRebalanceData;
     }
+
+    StreamsMetricsImpl streamsMetrics() {
+        return streamsMetrics;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c440aaf478a..d1da7eb235a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
 import org.apache.kafka.common.serialization.Serdes;
@@ -72,6 +73,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.errors.TopologyException;
+import 
org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@@ -1496,6 +1498,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
@@ -2726,6 +2729,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         ) {
             @Override
@@ -2787,6 +2791,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         ) {
             @Override
@@ -2856,6 +2861,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         ) {
             @Override
@@ -2922,6 +2928,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         ) {
             @Override
@@ -2985,6 +2992,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         ) {
             @Override
@@ -3220,6 +3228,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         );
         final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<>());
@@ -3279,6 +3288,7 @@ public class StreamThreadTest {
             (e, b) -> { },
             null,
             Optional.empty(),
+            null,
             null
         ) {
             @Override
@@ -3887,7 +3897,8 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.of(streamsRebalanceData),
-            streamsMetadataState
+            streamsMetadataState,
+            null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -3947,7 +3958,8 @@ public class StreamThreadTest {
                 HANDLER,
                 null,
                 Optional.of(streamsRebalanceData),
-                streamsMetadataState
+                streamsMetadataState,
+                null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -4016,7 +4028,8 @@ public class StreamThreadTest {
                 HANDLER,
                 null,
                 Optional.of(streamsRebalanceData),
-                streamsMetadataState
+                streamsMetadataState,
+                null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -4076,7 +4089,8 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.of(streamsRebalanceData),
-            streamsMetadataState
+            streamsMetadataState,
+            null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -4136,7 +4150,8 @@ public class StreamThreadTest {
                 HANDLER,
                 null,
                 Optional.of(streamsRebalanceData),
-                streamsMetadataState
+                streamsMetadataState,
+                null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -4205,7 +4220,8 @@ public class StreamThreadTest {
                 HANDLER,
                 null,
                 Optional.of(streamsRebalanceData),
-                streamsMetadataState
+                streamsMetadataState,
+                null
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -4269,6 +4285,34 @@ public class StreamThreadTest {
         );
     }
 
+    @Test
+    public void shouldRemoveMetricsDelegatingReporterOnShutdown() throws 
InterruptedException {
+        thread = createStreamThread(CLIENT_ID, false, false);
+
+        final List<MetricsReporter> reportersAfterCreate = 
thread.streamsMetrics().metricsRegistry().reporters();
+        assertThat(
+                reportersAfterCreate.stream()
+                        .filter(r -> r instanceof 
StreamsThreadMetricsDelegatingReporter)
+                        .count(),
+                equalTo(1L)
+        );
+
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        TestUtils.waitForCondition(
+                () -> thread.state() == StreamThread.State.DEAD,
+                10 * 1000,
+                "Thread never shut down."
+        );
+
+        final List<MetricsReporter> reportersAfterShutdown = 
thread.streamsMetrics().metricsRegistry().reporters();
+        assertThat(
+                reportersAfterShutdown.stream()
+                        .filter(r -> r instanceof 
StreamsThreadMetricsDelegatingReporter)
+                        .count(),
+                equalTo(0L)
+        );
+    }
+
     private StreamThread setUpThread(final Properties streamsConfigProps) {
         final StreamsConfig config = new StreamsConfig(streamsConfigProps);
         final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
@@ -4301,6 +4345,7 @@ public class StreamThreadTest {
             null,
             null,
             Optional.empty(),
+            null,
             null
         );
     }
@@ -4424,6 +4469,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.empty(),
+            null,
             null
         );
     }

Reply via email to