This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new db6b5af2b4d KAFKA-20549: Change ownership of DLQ state manager. [4/N]
(#22300)
db6b5af2b4d is described below
commit db6b5af2b4d0a0f599da6e9c09c33959c49b7dd8
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon May 18 02:33:56 2026 +0530
KAFKA-20549: Change ownership of DLQ state manager. [4/N] (#22300)
* Since `ShareGroupDLQStateManager` life cycle is meant to be controlled
by `DefaultShareGroupDLQManager`, construction of the former has been
encapsulated in the latter.
Reviewers: Andrew Schofield <[email protected]>
---
.../server/share/dlq/DefaultShareGroupDLQManager.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
index c06af86d014..47fed728f23 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -17,6 +17,10 @@
package org.apache.kafka.server.share.dlq;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.util.timer.Timer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,14 +39,14 @@ public class DefaultShareGroupDLQManager implements
ShareGroupDLQManager {
private static final Logger log =
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
- public static ShareGroupDLQManager instance(ShareGroupDLQStateManager
stateManager) {
- DefaultShareGroupDLQManager instance = new
DefaultShareGroupDLQManager(stateManager);
+ public static ShareGroupDLQManager instance(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ DefaultShareGroupDLQManager instance = new
DefaultShareGroupDLQManager(client, cacheHelper, time, timer);
instance.start();
return instance;
}
- private DefaultShareGroupDLQManager(ShareGroupDLQStateManager
stateManager) {
- this.stateManager = stateManager;
+ private DefaultShareGroupDLQManager(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper,
time, timer);
}
private void start() {
@@ -63,9 +67,7 @@ public class DefaultShareGroupDLQManager implements
ShareGroupDLQManager {
@Override
public void stop() {
try {
- if (stateManager != null) {
- stateManager.stop();
- }
+ stateManager.stop();
} catch (Exception e) {
log.error("Unable to stop DLQ state manager", e);
}