This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddec811aa5d KAFKA-19937: Introduced Shared ReaperThread for Persister
/ NetworkPartitionMetadataClient (#21842)
ddec811aa5d is described below
commit ddec811aa5d2a9fc2b4e0cd6e792d0339cbb9398
Author: Rion Williams <[email protected]>
AuthorDate: Tue Apr 28 03:58:58 2026 -0500
KAFKA-19937: Introduced Shared ReaperThread for Persister /
NetworkPartitionMetadataClient (#21842)
### Description
This pull request addresses the redundant thread usage detailed in
[KAFKA-19937](https://issues.apache.org/jira/browse/KAFKA-19937)
affecting the `PersisterStateManager` and
`NetworkPartitionMetadataClient` classes specifically. Presently each
creates/manages its own separate `SystemTimerReaper` instances, but rely
on identical timers with independent tasks. The changes proposed address
this by introducing a new, sharable instance of the thread to reduce
overhead.
### Key Changes
- Updated `BrokerServer` to create a single shared `SystemTimerReaper`
instance used by both `PersisterStateManager` and
`NetworkPartitionMetadataClient`, with cleanup in the shutdown path
after both components have been stopped.
- Moved timer ownership to the caller for the affected classes to the
caller (e.g., `PersisterStateManager.stop()` and
`NetworkPartitionMetadataClient.close()` no longer close their injected
timer, as lifecycle is managed by `BrokerServer`).
- This specific timer ownership behavior is documented via JavaDocs
for both `PersisterStateManager` and `NetworkPartitionMetadataClient`
- Added null validation to the `SystemTimerReaper` constructor
arguments.
### Tests and Verification
Verified that all existing test suites still pass as expected and added
the following to verify new behavior and usage related to the above
changes:
- Extended `SystemTimerReaperTest.java` to verify null validity and
timer-sharing behavior (e.g., two consumers sharing a timer can both
schedule and expire tasks independently).
- Updated `PersisterStateManagerTest.java` to verify that `stop()` does
not close the timer, consistent with the new caller-ownership contract.
### Reviewer(s)
Tagging @AndrewJSchofield (initial reporter)
Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 16 ++++++-----
.../share/persister/PersisterStateManager.java | 9 +++++--
.../kafka/server/util/timer/SystemTimerReaper.java | 6 +++--
.../share/persister/PersisterStateManagerTest.java | 3 ++-
.../server/util/timer/SystemTimerReaperTest.java | 31 ++++++++++++++++++++++
.../util/NetworkPartitionMetadataClient.java | 9 +++++--
6 files changed, 61 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5f190a18869..53811dc9f21 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import
org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYamm
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
import org.apache.kafka.server.share.persister.{DefaultStatePersister,
NoOpStatePersister, Persister, PersisterStateManager}
import org.apache.kafka.server.share.session.ShareSessionCache
-import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
+import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper,
Timer}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler,
NetworkPartitionMetadataClient, PartitionMetadataClient}
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
BrokerLifecycleManager, ClientMetricsManager, DefaultApiVersionManager,
DelayedActionQueue, FetchManager, FetchSessionCacheShard, KRaftTopicCreator,
NodeToControllerChannelManagerImpl, ProcessRole, RaftControllerNodeProvider}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
@@ -168,6 +168,8 @@ class BrokerServer(
var persister: Persister = _
+ private var shareGroupTimer: Timer = _
+
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus):
Boolean = {
lock.lock()
try {
@@ -381,6 +383,9 @@ class BrokerServer(
/* create share coordinator */
shareCoordinator = createShareCoordinator()
+ /* create shared timer for share group components */
+ shareGroupTimer = new SystemTimerReaper("share-group-reaper", new
SystemTimer("share-group"))
+
/* create persister */
persister = createShareStatePersister()
@@ -655,7 +660,7 @@ class BrokerServer(
),
Time.SYSTEM,
config.interBrokerListenerName(),
- new SystemTimerReaper("network-partition-metadata-client-reaper", new
SystemTimer("network-partition-metadata-client"))
+ shareGroupTimer
)
}
@@ -734,10 +739,7 @@ class BrokerServer(
NetworkUtils.buildNetworkClient("Persister", config, metrics,
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName),
Time.SYSTEM,
- new SystemTimerReaper(
- "persister-state-manager-reaper",
- new SystemTimer("persister")
- )
+ shareGroupTimer
)
)
} else if (klass.getName.equals(classOf[NoOpStatePersister].getName)) {
@@ -887,6 +889,8 @@ class BrokerServer(
if (persister != null)
Utils.swallow(this.logger.underlying, () => persister.stop())
+ Utils.closeQuietly(shareGroupTimer, "share group timer")
+
if (lifecycleManager != null)
Utils.swallow(this.logger.underlying, () => lifecycleManager.close())
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 28c4c866f39..00e2b645235 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -54,7 +54,6 @@ import
org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.InterBrokerSendThread;
@@ -127,6 +126,13 @@ public class PersisterStateManager {
UNKNOWN
}
+ /**
+ * Creates a new PersisterStateManager.
+ *
+ * <p>The caller retains ownership of the supplied {@link Timer} and is
responsible for
+ * closing it. {@link #stop()} will not close the timer, allowing it to be
shared with
+ * other components.
+ */
public PersisterStateManager(KafkaClient client,
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
if (client == null) {
throw new IllegalArgumentException("Kafkaclient must not be
null.");
@@ -166,7 +172,6 @@ public class PersisterStateManager {
public void stop() throws Exception {
if (isStarted.compareAndSet(true, false)) {
this.sender.shutdown();
- Utils.closeQuietly(this.timer, "PersisterStateManager timer");
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
index 5c7fa4d8ca4..bc2275e4c4a 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java
@@ -18,6 +18,8 @@ package org.apache.kafka.server.util.timer;
import org.apache.kafka.server.util.ShutdownableThread;
+import java.util.Objects;
+
/**
* SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
* to expire the tasks in the {@link Timer}.
@@ -44,8 +46,8 @@ public class SystemTimerReaper implements Timer {
private final Reaper reaper;
public SystemTimerReaper(String reaperThreadName, Timer timer) {
- this.timer = timer;
- this.reaper = new Reaper(reaperThreadName);
+ this.timer = Objects.requireNonNull(timer, "timer must not be null");
+ this.reaper = new Reaper(Objects.requireNonNull(reaperThreadName,
"reaperThreadName must not be null"));
this.reaper.start();
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index ffc89dcfdcf..fec9cb65483 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -4827,7 +4827,8 @@ class PersisterStateManagerTest {
psm.stop();
verify(client, times(1)).close();
- verify(timer, times(1)).close();
+ // Timer lifecycle is the caller's responsibility, not
PersisterStateManager's.
+ verify(timer, times(0)).close();
} catch (Exception e) {
fail("unexpected exception", e);
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
index a340bc5229b..c8ae54e02d5 100644
---
a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java
@@ -24,6 +24,8 @@ import org.mockito.Mockito;
import java.util.concurrent.CompletableFuture;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
public class SystemTimerReaperTest {
private static class FutureTimerTask<T> extends TimerTask {
CompletableFuture<T> future = new CompletableFuture<>();
@@ -67,4 +69,33 @@ public class SystemTimerReaperTest {
Mockito.verify(timer, Mockito.times(1)).close();
TestUtils.waitForCondition(timerReaper::isShutdown, "reaper not
shutdown");
}
+
+ @Test
+ public void testSharedTimerBetweenConsumers() throws Exception {
+ try (Timer timer = new SystemTimerReaper("shared-reaper", new
SystemTimer("shared-timer"))) {
+ // Set up two independent consumer tasks to the same timer
+ CompletableFuture<Void> consumer1Task = add(timer, 100L);
+ CompletableFuture<Void> consumer2Task = add(timer, 200L);
+
+ TestUtils.assertFutureThrows(TimeoutException.class,
consumer1Task);
+ TestUtils.assertFutureThrows(TimeoutException.class,
consumer2Task);
+
+ // After the first consumer's tasks have completed (simulating one
consumer
+ // stopping), the second consumer can still schedule and expire
tasks as expected
+ CompletableFuture<Void> consumer2LateTasks = add(timer, 100L);
+ TestUtils.assertFutureThrows(TimeoutException.class,
consumer2LateTasks);
+ }
+ }
+
+ @Test
+ public void testRejectsNullName() {
+ assertThrows(NullPointerException.class, () ->
+ new SystemTimerReaper(null, Mockito.mock(Timer.class)));
+ }
+
+ @Test
+ public void testRejectsNullTimer() {
+ assertThrows(NullPointerException.class, () ->
+ new SystemTimerReaper("reaper", null));
+ }
}
diff --git
a/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
b/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
index e1ef0f8a503..51967081af2 100644
---
a/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
+++
b/server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.util.timer.Timer;
@@ -68,6 +67,13 @@ public class NetworkPartitionMetadataClient implements
PartitionMetadataClient {
private volatile SendThread sendThread;
private final Timer timer;
+ /**
+ * Creates a new NetworkPartitionMetadataClient.
+ *
+ * <p>The caller retains ownership of the supplied {@link Timer} and is
responsible for
+ * closing it. {@link #close()} will not close the timer, allowing it to
be shared with
+ * other components.
+ */
public NetworkPartitionMetadataClient(MetadataCache metadataCache,
Supplier<KafkaClient>
networkClientSupplier,
Time time, ListenerName
listenerName, Timer timer) {
@@ -149,7 +155,6 @@ public class NetworkPartitionMetadataClient implements
PartitionMetadataClient {
public void close() {
// Only close sendThread if it was initialized. Note, close is called
only during broker shutdown, so need
// for further synchronization here.
- Utils.closeQuietly(timer, "NetworkPartitionMetadataClient timer");
if (!initialized.get()) {
return;
}