This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db6b5af2b4d KAFKA-20549: Change ownership of DLQ state manager. [4/N] 
(#22300)
db6b5af2b4d is described below

commit db6b5af2b4d0a0f599da6e9c09c33959c49b7dd8
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon May 18 02:33:56 2026 +0530

    KAFKA-20549: Change ownership of DLQ state manager. [4/N] (#22300)
    
    * Since `ShareGroupDLQStateManager` life cycle is meant to be controlled
    by `DefaultShareGroupDLQManager`, construction of the former has been
    encapsulated in the latter.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../server/share/dlq/DefaultShareGroupDLQManager.java    | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
index c06af86d014..47fed728f23 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -17,6 +17,10 @@
 
 package org.apache.kafka.server.share.dlq;
 
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.util.timer.Timer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +39,14 @@ public class DefaultShareGroupDLQManager implements 
ShareGroupDLQManager {
 
     private static final Logger log = 
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
 
-    public static ShareGroupDLQManager instance(ShareGroupDLQStateManager 
stateManager) {
-        DefaultShareGroupDLQManager instance = new 
DefaultShareGroupDLQManager(stateManager);
+    public static ShareGroupDLQManager instance(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        DefaultShareGroupDLQManager instance = new 
DefaultShareGroupDLQManager(client, cacheHelper, time, timer);
         instance.start();
         return instance;
     }
 
-    private DefaultShareGroupDLQManager(ShareGroupDLQStateManager 
stateManager) {
-        this.stateManager = stateManager;
+    private DefaultShareGroupDLQManager(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, 
time, timer);
     }
 
     private void start() {
@@ -63,9 +67,7 @@ public class DefaultShareGroupDLQManager implements 
ShareGroupDLQManager {
     @Override
     public void stop() {
         try {
-            if (stateManager != null) {
-                stateManager.stop();
-            }
+            stateManager.stop();
         } catch (Exception e) {
             log.error("Unable to stop DLQ state manager", e);
         }

Reply via email to