This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ddec811aa5d KAFKA-19937: Introduced Shared ReaperThread for Persister 
/ NetworkPartitionMetadataClient (#21842)
ddec811aa5d is described below

commit ddec811aa5d2a9fc2b4e0cd6e792d0339cbb9398
Author: Rion Williams <[email protected]>
AuthorDate: Tue Apr 28 03:58:58 2026 -0500

    KAFKA-19937: Introduced Shared ReaperThread for Persister / 
NetworkPartitionMetadataClient (#21842)
    
    ### Description
    
    This pull request addresses the redundant thread usage detailed in
    [KAFKA-19937](https://issues.apache.org/jira/browse/KAFKA-19937)
    affecting the `PersisterStateManager` and
    `NetworkPartitionMetadataClient` classes specifically. Presently each
    creates/manages its own separate `SystemTimerReaper` instances, but rely
    on identical timers with independent tasks. The changes proposed address
    this by introducing a new, sharable instance of the thread to reduce
    overhead.
    
    ### Key Changes
    - Updated `BrokerServer` to create a single shared `SystemTimerReaper`
    instance used by both `PersisterStateManager` and
    `NetworkPartitionMetadataClient`, with cleanup in the shutdown path
    after both components have been stopped.
    - Moved timer ownership to the caller for the affected classes to the
    caller (e.g., `PersisterStateManager.stop()` and
    `NetworkPartitionMetadataClient.close()` no longer close their injected
    timer, as lifecycle is managed by `BrokerServer`).
      - This specific timer ownership behavior is documented via JavaDocs
    for both `PersisterStateManager` and `NetworkPartitionMetadataClient`
    - Added null validation to the `SystemTimerReaper` constructor
    arguments.
    
    ### Tests and Verification
    Verified that all existing test suites still pass as expected and added
    the following to verify new behavior and usage related to the above
    changes:
    - Extended `SystemTimerReaperTest.java` to verify null validity and
    timer-sharing behavior (e.g., two consumers sharing a timer can both
    schedule and expire tasks independently).
    - Updated `PersisterStateManagerTest.java` to verify that `stop()` does
    not close the timer, consistent with the new caller-ownership contract.
    
    ### Reviewer(s)
    
    Tagging @AndrewJSchofield (initial reporter)
    
    Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala | 16 ++++++-----
 .../share/persister/PersisterStateManager.java     |  9 +++++--
 .../kafka/server/util/timer/SystemTimerReaper.java |  6 +++--
 .../share/persister/PersisterStateManagerTest.java |  3 ++-
 .../server/util/timer/SystemTimerReaperTest.java   | 31 ++++++++++++++++++++++
 .../util/NetworkPartitionMetadataClient.java       |  9 +++++--
 6 files changed, 61 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5f190a18869..53811dc9f21 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import 
org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYamm
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.share.persister.{DefaultStatePersister, 
NoOpStatePersister, Persister, PersisterStateManager}
 import org.apache.kafka.server.share.session.ShareSessionCache
-import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
+import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper, 
Timer}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler, 
NetworkPartitionMetadataClient, PartitionMetadataClient}
 import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
BrokerLifecycleManager, ClientMetricsManager, DefaultApiVersionManager, 
DelayedActionQueue, FetchManager, FetchSessionCacheShard, KRaftTopicCreator, 
NodeToControllerChannelManagerImpl, ProcessRole, RaftControllerNodeProvider}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
@@ -168,6 +168,8 @@ class BrokerServer(
 
   var persister: Persister = _
 
+  private var shareGroupTimer: Timer = _
+
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): 
Boolean = {
     lock.lock()
     try {
@@ -381,6 +383,9 @@ class BrokerServer(
       /* create share coordinator */
       shareCoordinator = createShareCoordinator()
 
+      /* create shared timer for share group components */
+      shareGroupTimer = new SystemTimerReaper("share-group-reaper", new 
SystemTimer("share-group"))
+
       /* create persister */
       persister = createShareStatePersister()
 
@@ -655,7 +660,7 @@ class BrokerServer(
       ),
       Time.SYSTEM,
       config.interBrokerListenerName(),
-      new SystemTimerReaper("network-partition-metadata-client-reaper", new 
SystemTimer("network-partition-metadata-client"))
+      shareGroupTimer
     )
   }
 
@@ -734,10 +739,7 @@ class BrokerServer(
               NetworkUtils.buildNetworkClient("Persister", config, metrics, 
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
               new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key 
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName),
               Time.SYSTEM,
-              new SystemTimerReaper(
-                "persister-state-manager-reaper",
-                new SystemTimer("persister")
-              )
+              shareGroupTimer
             )
           )
       } else if (klass.getName.equals(classOf[NoOpStatePersister].getName)) {
@@ -887,6 +889,8 @@ class BrokerServer(
       if (persister != null)
         Utils.swallow(this.logger.underlying, () => persister.stop())
 
+      Utils.closeQuietly(shareGroupTimer, "share group timer")
+
       if (lifecycleManager != null)
         Utils.swallow(this.logger.underlying, () => lifecycleManager.close())
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 28c4c866f39..00e2b645235 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -54,7 +54,6 @@ import 
org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.InterBrokerSendThread;
@@ -127,6 +126,13 @@ public class PersisterStateManager {
         UNKNOWN
     }
 
+    /**
+     * Creates a new PersisterStateManager.
+     *
+     * <p>The caller retains ownership of the supplied {@link Timer} and is 
responsible for
+     * closing it. {@link #stop()} will not close the timer, allowing it to be 
shared with
+     * other components.
+     */
     public PersisterStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
         if (client == null) {
             throw new IllegalArgumentException("Kafkaclient must not be 
null.");
@@ -166,7 +172,6 @@ public class PersisterStateManager {
     public void stop() throws Exception {
         if (isStarted.compareAndSet(true, false)) {
             this.sender.shutdown();
-            Utils.closeQuietly(this.timer, "PersisterStateManager timer");
         }
     }
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
index 5c7fa4d8ca4..bc2275e4c4a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
@@ -18,6 +18,8 @@ package org.apache.kafka.server.util.timer;
 
 import org.apache.kafka.server.util.ShutdownableThread;
 
+import java.util.Objects;
+
 /**
  * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
  * to expire the tasks in the {@link Timer}.
@@ -44,8 +46,8 @@ public class SystemTimerReaper implements Timer {
     private final Reaper reaper;
 
     public SystemTimerReaper(String reaperThreadName, Timer timer) {
-        this.timer = timer;
-        this.reaper = new Reaper(reaperThreadName);
+        this.timer = Objects.requireNonNull(timer, "timer must not be null");
+        this.reaper = new Reaper(Objects.requireNonNull(reaperThreadName, 
"reaperThreadName must not be null"));
         this.reaper.start();
     }
 
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index ffc89dcfdcf..fec9cb65483 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -4827,7 +4827,8 @@ class PersisterStateManagerTest {
             psm.stop();
 
             verify(client, times(1)).close();
-            verify(timer, times(1)).close();
+            // Timer lifecycle is the caller's responsibility, not 
PersisterStateManager's.
+            verify(timer, times(0)).close();
         } catch (Exception e) {
             fail("unexpected exception", e);
         }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
index a340bc5229b..c8ae54e02d5 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
@@ -24,6 +24,8 @@ import org.mockito.Mockito;
 
 import java.util.concurrent.CompletableFuture;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 public class SystemTimerReaperTest {
     private static class FutureTimerTask<T> extends TimerTask {
         CompletableFuture<T> future = new CompletableFuture<>();
@@ -67,4 +69,33 @@ public class SystemTimerReaperTest {
         Mockito.verify(timer, Mockito.times(1)).close();
         TestUtils.waitForCondition(timerReaper::isShutdown, "reaper not 
shutdown");
     }
+
+    @Test
+    public void testSharedTimerBetweenConsumers() throws Exception {
+        try (Timer timer = new SystemTimerReaper("shared-reaper", new 
SystemTimer("shared-timer"))) {
+            // Set up two independent consumer tasks to the same timer
+            CompletableFuture<Void> consumer1Task = add(timer, 100L);
+            CompletableFuture<Void> consumer2Task = add(timer, 200L);
+
+            TestUtils.assertFutureThrows(TimeoutException.class, 
consumer1Task);
+            TestUtils.assertFutureThrows(TimeoutException.class, 
consumer2Task);
+
+            // After the first consumer's tasks have completed (simulating one 
consumer
+            // stopping), the second consumer can still schedule and expire 
tasks as expected
+            CompletableFuture<Void> consumer2LateTasks = add(timer, 100L);
+            TestUtils.assertFutureThrows(TimeoutException.class, 
consumer2LateTasks);
+        }
+    }
+
+    @Test
+    public void testRejectsNullName() {
+        assertThrows(NullPointerException.class, () ->
+            new SystemTimerReaper(null, Mockito.mock(Timer.class)));
+    }
+
+    @Test
+    public void testRejectsNullTimer() {
+        assertThrows(NullPointerException.class, () ->
+            new SystemTimerReaper("reaper", null));
+    }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
 
b/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
index e1ef0f8a503..51967081af2 100644
--- 
a/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
+++ 
b/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.util.timer.Timer;
@@ -68,6 +67,13 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
     private volatile SendThread sendThread;
     private final Timer timer;
 
+    /**
+     * Creates a new NetworkPartitionMetadataClient.
+     *
+     * <p>The caller retains ownership of the supplied {@link Timer} and is 
responsible for
+     * closing it. {@link #close()} will not close the timer, allowing it to 
be shared with
+     * other components.
+     */
     public NetworkPartitionMetadataClient(MetadataCache metadataCache,
                                           Supplier<KafkaClient> 
networkClientSupplier,
                                           Time time, ListenerName 
listenerName, Timer timer) {
@@ -149,7 +155,6 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
     public void close() {
         // Only close sendThread if it was initialized. Note, close is called 
only during broker shutdown, so need
         // for further synchronization here.
-        Utils.closeQuietly(timer, "NetworkPartitionMetadataClient timer");
         if (!initialized.get()) {
             return;
         }

Reply via email to