This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bbd81e8d000 KAFKA-20549: Share group DLQ manager implementation. 
Skeleton classes [1/N] (#22209)
bbd81e8d000 is described below

commit bbd81e8d000cea9e6a663f8b4a0d656880a82721
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed May 6 01:09:47 2026 +0530

    KAFKA-20549: Share group DLQ manager implementation. Skeleton classes [1/N] 
(#22209)
    
    * Added default share group dlq manager impl
    `DefaultShareGroupDLQManager`.
    * Added base impl for share group dlq state manager
    `ShareGroupDLQStateManager`
    * Extended `ShareGroupDLQManager` interface to add method `void stop()`
    for
    cleanup.
    * The classes are standalone at the moment and not plugged into the rest
    of the code.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 checkstyle/import-control-server-common.xml        |   5 +
 .../java/kafka/server/share/SharePartition.java    |   8 +-
 .../share/dlq/DefaultShareGroupDLQManager.java     | 108 ++++++++++++++
 .../server/share/dlq/NoOpShareGroupDLQManager.java |   9 +-
 ...hareGroupDLQ.java => ShareGroupDLQManager.java} |   9 +-
 .../share/dlq/ShareGroupDLQRecordParameter.java    |   2 +-
 .../share/dlq/ShareGroupDLQStateManager.java       | 160 +++++++++++++++++++++
 7 files changed, 292 insertions(+), 9 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index 92da434568e..fd44a4a3f4b 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -113,6 +113,11 @@
             <allow pkg="org.apache.kafka.server.util" />
             <allow pkg="org.apache.kafka.test" />
           </subpackage>
+          <subpackage name="dlq">
+            <allow pkg="org.apache.kafka.clients" />
+            <allow pkg="org.apache.kafka.server.util" />
+            <allow pkg="org.apache.kafka.test" />
+          </subpackage>
         </subpackage>
 
         <subpackage name="util">
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 4044c94cbcc..7e86a4477a6 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -43,7 +43,7 @@ import 
org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
-import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
@@ -332,7 +332,7 @@ public class SharePartition {
     /**
      * Reference to the dlq manager implementation.
      */
-    private final ShareGroupDLQ shareGroupDLQ = new NoOpShareGroupDLQManager();
+    private final ShareGroupDLQManager shareGroupDLQ = new 
NoOpShareGroupDLQManager();
 
     /**
      * Supplier to toggle dlq support.
@@ -2335,7 +2335,7 @@ public class SharePartition {
                     // mapping between bytes and record state type. All ack 
types have been added except for RENEW which
                     // has been handled above.
                     RecordState recordState = recordStateWithDlq(ackType);
-                    Throwable dlqCause = recordState == RecordState.ARCHIVING 
? ShareGroupDLQ.CLIENT_REJECT : null;
+                    Throwable dlqCause = recordState == RecordState.ARCHIVING 
? ShareGroupDLQManager.CLIENT_REJECT : null;
                     if (recordState == null) {
                         return Optional.of(new 
IllegalArgumentException("Unknown acknowledge type id: " + ackType));
                     }
@@ -2416,7 +2416,7 @@ public class SharePartition {
             // either released or moved to a state where member id existence 
is not important. The member id
             // is only important when the batch is acquired.
             RecordState recordState = recordStateWithDlq(ackType);
-            Throwable dlqCause = recordState == RecordState.ARCHIVING ? 
ShareGroupDLQ.CLIENT_REJECT : null;
+            Throwable dlqCause = recordState == RecordState.ARCHIVING ? 
ShareGroupDLQManager.CLIENT_REJECT : null;
             if (recordState == null) {
                 return Optional.of(new IllegalArgumentException("Unknown 
acknowledge type id: " + ackType));
             }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
new file mode 100644
index 00000000000..64361aa0564
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default share group DLQ manager responsible for processing
+ * incoming messages and writing them to the appropriate dlq topic.
+ */
+public class DefaultShareGroupDLQManager implements ShareGroupDLQManager {
+    /**
+     * Reference to state manager responsible for actually sending
+     * the relevant RPCs and writing records.
+     */
+    private final ShareGroupDLQStateManager stateManager;
+
+    private static final Logger log = 
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
+
+    public DefaultShareGroupDLQManager instance(ShareGroupDLQStateManager 
stateManager) {
+        DefaultShareGroupDLQManager instance = new 
DefaultShareGroupDLQManager(stateManager);
+        instance.start();
+        return instance;
+    }
+
+    private DefaultShareGroupDLQManager(ShareGroupDLQStateManager 
stateManager) {
+        this.stateManager = stateManager;
+    }
+
+    private void start() {
+        this.stateManager.start();
+    }
+
+    @Override
+    public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) 
{
+        try {
+            validate(param);
+        } catch (Exception e) {
+            log.error("Unable to validate dlq record parameters", e);
+            return CompletableFuture.failedFuture(e);
+        }
+        return stateManager.dlq(param);
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (stateManager != null) {
+                stateManager.stop();
+            }
+        } catch (Exception e) {
+            log.error("Unable to stop DLQ state manager", e);
+        }
+    }
+
+    private static void validate(ShareGroupDLQRecordParameter param) {
+        String prefix = "DLQ records parameters";
+        if (param == null) {
+            throw new IllegalArgumentException(prefix + " cannot be null.");
+        }
+
+        if (param.groupId() == null || param.groupId().isEmpty()) {
+            throw new IllegalArgumentException(prefix + " group cannot be null 
or empty.");
+        }
+
+        if (param.topicIdPartition() == null) {
+            throw new IllegalArgumentException(prefix + " topic/partition data 
cannot be null or empty.");
+        }
+
+        if (param.topicIdPartition().topicId() == null) {
+            throw new IllegalArgumentException(prefix + " topic id data cannot 
be null or empty.");
+        }
+
+        if (param.topicIdPartition().partition() < 0) {
+            throw new IllegalArgumentException(prefix + " partition cannot be 
negative.");
+        }
+
+        if (param.lastOffset() < param.firstOffset()) {
+            throw new IllegalArgumentException(prefix + " last offset cannot 
be less than first offset.");
+        }
+
+        if (param.firstOffset() < 0) {
+            throw new IllegalArgumentException(prefix + " first offset cannot 
be negative.");
+        }
+
+        if (param.lastOffset() < 0) {
+            throw new IllegalArgumentException(prefix + " last offset cannot 
be negative.");
+        }
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
index cd7b971b47b..ea4f5367d1b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -23,11 +23,11 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * A no op implementation of {@link ShareGroupDLQ}. This will be useful
+ * A no op implementation of {@link ShareGroupDLQManager}. This will be useful
  * in development cycle and testing. All methods return immediately with
  * a successfully completed future.
  */
-public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
+public class NoOpShareGroupDLQManager implements ShareGroupDLQManager {
     private static final Logger log = 
LoggerFactory.getLogger(NoOpShareGroupDLQManager.class);
 
     @Override
@@ -35,4 +35,9 @@ public class NoOpShareGroupDLQManager implements 
ShareGroupDLQ {
         log.trace("Enqueuing share group dlq record parameter: {}", param);
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public void stop() {
+        // noop
+    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java
similarity index 88%
rename from 
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
rename to 
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java
index 96bf848076c..f320b316009 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java
@@ -22,7 +22,7 @@ import java.util.concurrent.CompletableFuture;
 /**
  * The main interface to identify implementations of dead letter queues for 
share groups.
  */
-public interface ShareGroupDLQ {
+public interface ShareGroupDLQManager {
     class ShareGroupDLQThrowable extends Throwable {
         ShareGroupDLQThrowable(String message) {
             // We don't want the stack trace to be filled.
@@ -34,11 +34,16 @@ public interface ShareGroupDLQ {
     Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset 
delivery count exceeded the threshold.");
 
     /**
-     * Main method exposed to the world to enqueuing a record to the share 
groups dead letter queue.
+     * Main method exposed to the world to enqueue a record to the share 
groups dead letter queue.
      *
      * @param param A java record encapsulating required and optional 
information about the kafka record
      *              being dead letter queued.
      * @return A completable future of Void type, mainly to signal exceptions.
      */
     CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param);
+
+    /**
+     * Perform cleanup and interrupt any threads.
+     */
+    void stop();
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
index 465279d1229..2e7c728542d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import java.util.Optional;
 
 /**
- * Record representing information needed from callers of {@link 
ShareGroupDLQ#enqueue}. Inclusion
+ * Record representing information needed from callers of {@link 
ShareGroupDLQManager#enqueue}. Inclusion
  * of first and last offset allows passing batch information as well.
  *
  * @param groupId            The share group id of the message being recorded.
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
new file mode 100644
index 00000000000..771c18720a5
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Core implementation of RPC send logic for the dlq manager.
+ * This class allows for enqueuing records meant to be DLQ'ed
+ * and manages various RPC which are to be sent to the KafkaApis.
+ * These RPCs include PRODUCE, CREATE_TOPIC.
+ */
+public class ShareGroupDLQStateManager {
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    private final SendThread sender;
+    private final Time time;
+    private final Timer timer;
+    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
+
+    public enum RPCType {
+        PRODUCE,
+        CREATE_TOPIC
+    }
+
+    public ShareGroupDLQStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        if (client == null) {
+            throw new IllegalArgumentException("Kafkaclient must not be 
null.");
+        }
+
+        if (cacheHelper == null) {
+            throw new IllegalArgumentException("Cache helper must not be 
null.");
+        }
+
+        if (time == null) {
+            throw new IllegalArgumentException("Time must not be null.");
+        }
+
+        if (timer == null) {
+            throw new IllegalArgumentException("Timer must not be null.");
+        }
+
+        this.time = time;
+        this.timer = timer;
+        this.cacheHelper = cacheHelper;
+        this.sender = new SendThread(
+            "ShareGroupDLQSendThread",
+            client,
+            
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS),
  //30 seconds
+            this.time,
+            true,
+            new Random(this.time.milliseconds())
+        );
+    }
+
+    public void start() {
+        if (isStarted.compareAndSet(false, true)) {
+            this.sender.start();
+            isStarted.set(true);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (isStarted.compareAndSet(true, false)) {
+            this.sender.shutdown();
+        }
+    }
+
+    /**
+     * Enqueues a {@link ShareGroupDLQRecordParameter} based on which records 
will be DLQ'ed.
+     * The actual record written to the DLQ topic will be built by fetching 
information from this argument.
+     *
+     * @param param Reference comprising offset information
+     * @return A future completing normally on successful DLQ, exceptionally 
otherwise.
+     */
+    public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param);
+        sender.enqueue(requestHandler);
+        return requestHandler.result().thenAccept(response -> {
+        });
+    }
+
+    private abstract class ShareGroupDLQStateManagerHandler implements 
RequestCompletionHandler {
+        protected abstract AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder();
+
+        protected abstract CompletableFuture<? extends AbstractResponse> 
result();
+    }
+
+    private class ProduceRequestHandler extends 
ShareGroupDLQStateManagerHandler {
+
+        ProduceRequestHandler(ShareGroupDLQRecordParameter param) {
+        }
+
+        @Override
+        protected AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
+            return null;
+        }
+
+        @Override
+        protected CompletableFuture<? extends AbstractResponse> result() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+
+        }
+    }
+
+    private static class SendThread extends InterBrokerSendThread {
+        private final 
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler>
 queue = new ConcurrentLinkedQueue<>();
+        private final Random random;
+
+        SendThread(String name, KafkaClient client, int requestTimeoutMs, Time 
time, boolean isInterruptible, Random random) {
+            super(name, client, requestTimeoutMs, time, isInterruptible);
+            this.random = random;
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return List.of();
+        }
+
+        public void 
enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
+            queue.add(handler);
+            wakeup();
+        }
+    }
+}

Reply via email to