This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1e82797919 KAFKA-20549: Implemented PRODUCE RPC handler for DLQ state 
manager. [5/N] (#22368)
d1e82797919 is described below

commit d1e82797919887e480cbda3d9fb8ba7b4dde0be8
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed May 27 22:10:24 2026 +0530

    KAFKA-20549: Implemented PRODUCE RPC handler for DLQ state manager. [5/N] 
(#22368)
    
    * Produce RPC impl.
    * Batching and coalescing added.
    * Unit tests for `ShareGroupDLQStateManager`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../ShareCoordinatorMetadataCacheHelperImpl.java   |  48 +-
 ...hareCoordinatorMetadataCacheHelperImplTest.java | 178 +++-
 .../dlq/ShareGroupDLQMetadataCacheHelper.java      |  38 +-
 .../share/dlq/ShareGroupDLQStateManager.java       | 433 ++++++++-
 .../share/dlq/ShareGroupDLQStateManagerTest.java   | 972 +++++++++++++++++++++
 5 files changed, 1627 insertions(+), 42 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
 
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
index ad556514891..6e0667cc6a9 100644
--- 
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
+++ 
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -18,6 +18,8 @@
 package kafka.server.share;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.network.ListenerName;
@@ -29,10 +31,12 @@ import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper;
 import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+import org.apache.kafka.storage.internals.log.LogConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -89,14 +93,14 @@ public class ShareCoordinatorMetadataCacheHelperImpl 
implements ShareCoordinator
     @Override
     public boolean isDlqEnabledOnTopic(String topic) {
         Properties props = metadataCache.topicConfig(topic);
-        if (props == null || props.isEmpty()) {
+        if (props == null) {
             return false;
         }
-        Object isEnabled = 
props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
-        if (isEnabled instanceof Boolean) {
-            return (boolean) isEnabled;
+        try {
+            return new 
LogConfig(props).getBoolean(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
+        } catch (ConfigException exe) {
+            return false;
         }
-        return false;
     }
 
     @Override
@@ -137,7 +141,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl 
implements ShareCoordinator
                 }
             }
         } catch (Exception e) {
-            log.warn("Exception while getting share coordinator", e);
+            log.warn("Exception while getting share coordinator.", e);
         }
         return Node.noNode();
     }
@@ -147,8 +151,38 @@ public class ShareCoordinatorMetadataCacheHelperImpl 
implements ShareCoordinator
         try {
             return metadataCache.getAliveBrokerNodes(interBrokerListenerName);
         } catch (Exception e) {
-            log.warn("Exception while getting cluster nodes", e);
+            log.warn("Exception while getting cluster nodes.", e);
         }
         return List.of();
     }
+
+    @Override
+    public Optional<String> topicName(Uuid topicId) {
+        try {
+            return metadataCache.getTopicName(topicId);
+        } catch (Exception e) {
+            log.warn("Exception while fetching topic name.", e);
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public TopicPartitionData topicPartitionData(String topicName) {
+        Uuid topicId = metadataCache.getTopicId(topicName);
+        Optional<Integer> numPartitions = 
metadataCache.numPartitions(topicName);
+        List<Node> partitionLeaders = new ArrayList<>();
+
+        if (numPartitions.isPresent()) {
+            for (int i = 0; i < numPartitions.get(); i++) {
+                
partitionLeaders.add(metadataCache.getPartitionLeaderEndpoint(topicName, i, 
interBrokerListenerName).orElse(null));
+            }
+        }
+
+        return new TopicPartitionData(
+            topicName,
+            numPartitions,
+            Optional.ofNullable(topicId == Uuid.ZERO_UUID ? null : topicId),
+            partitionLeaders
+        );
+    }
 }
diff --git 
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
 
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
index 69e8e235dd2..713a4d78909 100644
--- 
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
+++ 
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
@@ -29,10 +29,12 @@ import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper;
 import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -667,7 +669,7 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
     }
 
     @Test
-    public void testIsDlqEnabledOnTopicReturnsFalseWhenConfigValueNotBoolean() 
{
+    public void testIsDlqEnabledOnTopicReturnsTrue() {
         MetadataCache mockMetadataCache = mock(MetadataCache.class);
         Properties props = new Properties();
         props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
"true");
@@ -680,14 +682,14 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
             mock(GroupConfigManager.class)
         );
 
-        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+        assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
     }
 
     @Test
-    public void testIsDlqEnabledOnTopicReturnsTrue() {
+    public void testIsDlqEnabledOnTopicReturnsFalse() {
         MetadataCache mockMetadataCache = mock(MetadataCache.class);
         Properties props = new Properties();
-        props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
true);
+        props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
false);
         when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
 
         ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
@@ -697,15 +699,16 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
             mock(GroupConfigManager.class)
         );
 
-        assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
+        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
     }
 
+    // Tests for topicName
+
     @Test
-    public void testIsDlqEnabledOnTopicReturnsFalseValue() {
+    public void testTopicNameReturnsNameWhenPresent() {
         MetadataCache mockMetadataCache = mock(MetadataCache.class);
-        Properties props = new Properties();
-        props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
false);
-        when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+        Uuid topicId = Uuid.randomUuid();
+        
when(mockMetadataCache.getTopicName(topicId)).thenReturn(Optional.of("some-topic"));
 
         ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
@@ -714,6 +717,161 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
             mock(GroupConfigManager.class)
         );
 
-        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+        assertEquals(Optional.of("some-topic"), cache.topicName(topicId));
+        verify(mockMetadataCache, times(1)).getTopicName(topicId);
+    }
+
+    @Test
+    public void testTopicNameReturnsEmptyWhenNotPresent() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        Uuid topicId = Uuid.randomUuid();
+        
when(mockMetadataCache.getTopicName(topicId)).thenReturn(Optional.empty());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertEquals(Optional.empty(), cache.topicName(topicId));
+        verify(mockMetadataCache, times(1)).getTopicName(topicId);
+    }
+
+    @Test
+    public void testTopicNameReturnsEmptyOnException() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        Uuid topicId = Uuid.randomUuid();
+        when(mockMetadataCache.getTopicName(topicId)).thenThrow(new 
RuntimeException("boom"));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertEquals(Optional.empty(), cache.topicName(topicId));
+        verify(mockMetadataCache, times(1)).getTopicName(topicId);
+    }
+
+    // Tests for topicPartitionData
+
+    @Test
+    public void testTopicPartitionDataReturnsFullData() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        ListenerName mockListenerName = mock(ListenerName.class);
+        Uuid topicId = Uuid.randomUuid();
+        Node leader0 = new Node(0, "host0", 9092);
+        Node leader1 = new Node(1, "host1", 9092);
+
+        when(mockMetadataCache.getTopicId("test-topic")).thenReturn(topicId);
+        
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.of(2));
+        when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 0, 
mockListenerName))
+            .thenReturn(Optional.of(leader0));
+        when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 1, 
mockListenerName))
+            .thenReturn(Optional.of(leader1));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mockListenerName,
+            mock(GroupConfigManager.class)
+        );
+
+        ShareGroupDLQMetadataCacheHelper.TopicPartitionData data = 
cache.topicPartitionData("test-topic");
+
+        assertEquals("test-topic", data.topicName());
+        assertEquals(Optional.of(2), data.numPartitions());
+        assertEquals(Optional.of(topicId), data.topicId());
+        assertEquals(List.of(leader0, leader1), data.partitionLeaderNodes());
+
+        verify(mockMetadataCache, times(1)).getTopicId("test-topic");
+        verify(mockMetadataCache, times(1)).numPartitions("test-topic");
+        verify(mockMetadataCache, 
times(1)).getPartitionLeaderEndpoint("test-topic", 0, mockListenerName);
+        verify(mockMetadataCache, 
times(1)).getPartitionLeaderEndpoint("test-topic", 1, mockListenerName);
+    }
+
+    @Test
+    public void testTopicPartitionDataReturnsEmptyTopicIdWhenZeroUuid() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        ListenerName mockListenerName = mock(ListenerName.class);
+        Node leader0 = new Node(0, "host0", 9092);
+
+        
when(mockMetadataCache.getTopicId("test-topic")).thenReturn(Uuid.ZERO_UUID);
+        
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.of(1));
+        when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 0, 
mockListenerName))
+            .thenReturn(Optional.of(leader0));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mockListenerName,
+            mock(GroupConfigManager.class)
+        );
+
+        ShareGroupDLQMetadataCacheHelper.TopicPartitionData data = 
cache.topicPartitionData("test-topic");
+
+        assertEquals("test-topic", data.topicName());
+        assertEquals(Optional.of(1), data.numPartitions());
+        assertEquals(Optional.empty(), data.topicId());
+        assertEquals(List.of(leader0), data.partitionLeaderNodes());
+    }
+
+    @Test
+    public void testTopicPartitionDataWithoutNumPartitions() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        ListenerName mockListenerName = mock(ListenerName.class);
+        Uuid topicId = Uuid.randomUuid();
+
+        when(mockMetadataCache.getTopicId("test-topic")).thenReturn(topicId);
+        
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.empty());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mockListenerName,
+            mock(GroupConfigManager.class)
+        );
+
+        ShareGroupDLQMetadataCacheHelper.TopicPartitionData data = 
cache.topicPartitionData("test-topic");
+
+        assertEquals("test-topic", data.topicName());
+        assertEquals(Optional.empty(), data.numPartitions());
+        assertEquals(Optional.of(topicId), data.topicId());
+        assertEquals(List.of(), data.partitionLeaderNodes());
+
+        verify(mockMetadataCache, times(1)).getTopicId("test-topic");
+        verify(mockMetadataCache, times(1)).numPartitions("test-topic");
+        verify(mockMetadataCache, times(0)).getPartitionLeaderEndpoint(any(), 
any(Integer.class), any());
+    }
+
+    @Test
+    public void testTopicPartitionDataWithMissingPartitionLeader() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        ListenerName mockListenerName = mock(ListenerName.class);
+        Uuid topicId = Uuid.randomUuid();
+        Node leader0 = new Node(0, "host0", 9092);
+
+        when(mockMetadataCache.getTopicId("test-topic")).thenReturn(topicId);
+        
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.of(2));
+        when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 0, 
mockListenerName))
+            .thenReturn(Optional.of(leader0));
+        when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 1, 
mockListenerName))
+            .thenReturn(Optional.empty());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mockListenerName,
+            mock(GroupConfigManager.class)
+        );
+
+        ShareGroupDLQMetadataCacheHelper.TopicPartitionData data = 
cache.topicPartitionData("test-topic");
+
+        assertEquals("test-topic", data.topicName());
+        assertEquals(Optional.of(2), data.numPartitions());
+        assertEquals(Optional.of(topicId), data.topicId());
+        assertEquals(Arrays.asList(leader0, null), 
data.partitionLeaderNodes());
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
index e670676831f..bbc3949cbdc 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.server.share.dlq;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
 
 import java.util.List;
 import java.util.Optional;
@@ -28,11 +29,20 @@ import java.util.Optional;
  * and keeping implementations manageable.
  */
 public interface ShareGroupDLQMetadataCacheHelper {
+
+    public record TopicPartitionData(
+        String topicName,
+        Optional<Integer> numPartitions,
+        Optional<Uuid> topicId,
+        List<Node> partitionLeaderNodes
+    ) {
+    }
+
     /**
      * Return optional of string representing of DLQ topic.
      *
      * @param groupId Id of the share group
-     * @return  Optional of string representing of DLQ topic if set, empty 
otherwise
+     * @return Optional of string representing of DLQ topic if set, empty 
otherwise
      */
     Optional<String> shareGroupDlqTopic(String groupId);
 
@@ -40,7 +50,7 @@ public interface ShareGroupDLQMetadataCacheHelper {
      * Check if DLQ dynamic config is set on the topic to mark it available 
for DLQ writes.
      *
      * @param topic The name of the topic
-     * @return  Boolean which is true when DLQ is set on the topic, false 
otherwise
+     * @return Boolean which is true when DLQ is set on the topic, false 
otherwise
      */
     boolean isDlqEnabledOnTopic(String topic);
 
@@ -54,7 +64,7 @@ public interface ShareGroupDLQMetadataCacheHelper {
     /**
      * Return optional of string representing the configured DLQ prefix.
      *
-     * @return  Optional of string representing DLQ prefix if configured, 
empty otherwise
+     * @return Optional of string representing DLQ prefix if configured, empty 
otherwise
      */
     Optional<String> shareGroupDlqTopicPrefix();
 
@@ -62,7 +72,7 @@ public interface ShareGroupDLQMetadataCacheHelper {
      * Check is copy record data into DLQ topic is enabled.
      *
      * @param groupId The id of the share group
-     * @return  Boolean which is true if config is set, false otherwise
+     * @return Boolean which is true if config is set, false otherwise
      */
     boolean isShareGroupDlqCopyRecordEnabled(String groupId);
 
@@ -70,14 +80,30 @@ public interface ShareGroupDLQMetadataCacheHelper {
      * Check if a topic is present in the metadata cache.
      *
      * @param topic The name of the topic
-     * @return  Boolean which is true is topic exists, false otherwise
+     * @return Boolean which is true is topic exists, false otherwise
      */
     boolean containsTopic(String topic);
 
     /**
      * Get all nodes in the kafka cluster encapsulated in the {@link Node} 
object.
      *
-     * @return  List of nodes representing the cluster nodes
+     * @return List of nodes representing the cluster nodes
      */
     List<Node> getClusterNodes();
+
+    /**
+     * Fetch topic name, based on the topic id.
+     *
+     * @param topicId The uuid of the topic
+     * @return Optional specifying the name, or empty in case of error/not 
found.
+     */
+    Optional<String> topicName(Uuid topicId);
+
+    /**
+     * Fetch topic partition data, based on the topic name.
+     *
+     * @param topicName The name of the topic
+     * @return TopicPartitionData java record specifying the information.
+     */
+    TopicPartitionData topicPartitionData(String topicName);
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index 1bcfd1f025d..90f880ee08d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -22,17 +22,29 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.ProduceResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
+import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.util.InterBrokerSendThread;
 import org.apache.kafka.server.util.RequestAndCompletionHandler;
 import org.apache.kafka.server.util.timer.Timer;
@@ -41,10 +53,17 @@ import org.apache.kafka.server.util.timer.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,6 +87,10 @@ public class ShareGroupDLQStateManager {
     private static final double RETRY_BACKOFF_JITTER = 
CommonClientConfigs.RETRY_BACKOFF_JITTER;
     private static final Logger log = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.class);
 
+    private final Set<Node> inFlight = new HashSet<>();
+    private final Map<Node, List<ProduceRequestHandler>> nodeRPCMap = new 
HashMap<>();
+    private final Object nodeMapLock = new Object();
+
     public ShareGroupDLQStateManager(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
         if (client == null) {
             throw new IllegalArgumentException("Kafkaclient must not be 
null.");
@@ -120,8 +143,13 @@ public class ShareGroupDLQStateManager {
      * @return A future completing normally on successful DLQ, exceptionally 
otherwise.
      */
     public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
+        return dlq(param, REQUEST_BACKOFF_MS, REQUEST_BACKOFF_MAX_MS, 
MAX_REQUEST_ATTEMPTS);
+    }
+
+    // Visibility for tests
+    CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long 
requestBackoffMs, long requestBackoffMaxMs, int maxRequestAttempts) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future, REQUEST_BACKOFF_MS, 
REQUEST_BACKOFF_MAX_MS, MAX_REQUEST_ATTEMPTS);
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future, requestBackoffMs, requestBackoffMaxMs, 
maxRequestAttempts);
         enqueue(requestHandler);
         return future;
     }
@@ -130,11 +158,46 @@ public class ShareGroupDLQStateManager {
         sender.enqueue(requestHandler);
     }
 
-    private class ProduceRequestHandler implements RequestCompletionHandler {
+    /**
+     * Add a produce request handler after determining that the DLQ topic 
exists
+     * or has been created by he CREATE_TOPIC RPC. The map is used to collect 
all PRODUCE
+     * requests which are destined for a specific destination node. The Sender 
class
+     * then performs coalescing on all the handlers to create one single 
PRODUCE instead
+     * of sending multiple RPCs. This method is currently called when a DLQ 
topic already
+     * exists and there is no need to send a CREATE_TOPIC RPC and if it does 
not, post
+     * successful DLQ topic creation.
+     *
+     * @param node    The destination node where the produce request needs to 
be sent.
+     * @param handler The handler instance to add to the node map.
+     */
+    private void addRequestToNodeMap(Node node, ProduceRequestHandler handler) 
{
+        if (!handler.isBatchable()) {
+            return;
+        }
+        synchronized (nodeMapLock) {
+            nodeRPCMap.computeIfAbsent(node, k -> new LinkedList<>())
+                .add(handler);
+        }
+        sender.wakeup();
+    }
+
+    // Visibility for tests
+    class ProduceRequestHandler implements RequestCompletionHandler {
         private final CompletableFuture<Void> result;
         private final ShareGroupDLQRecordParameter param;
         private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
         private final ExponentialBackoffManager createTopicsBackoff;
+        private final ExponentialBackoffManager produceRequestBackoff;
+        private Node dlqPartitionLeaderNode;
+        private int dlqDestinationPartition;
+        private ShareGroupDLQMetadataCacheHelper.TopicPartitionData 
dlqTopicPartitionData;
+
+        public static final String HEADER_DLQ_ERRORS_TOPIC = 
"__dlq.errors.topic";
+        public static final String HEADER_DLQ_ERRORS_PARTITION = 
"__dlq.errors.partition";
+        public static final String HEADER_DLQ_ERRORS_OFFSET = 
"__dlq.errors.offset";
+        public static final String HEADER_DLQ_ERRORS_GROUP = 
"__dlq.errors.group";
+        public static final String HEADER_DLQ_ERRORS_DELIVERY_COUNT = 
"__dlq.errors.delivery.count";
+        public static final String HEADER_DLQ_ERRORS_MESSAGE = 
"__dlq.errors.message";
 
         public ProduceRequestHandler(
             ShareGroupDLQRecordParameter param,
@@ -152,6 +215,13 @@ public class ShareGroupDLQStateManager {
                 backoffMaxMs,
                 RETRY_BACKOFF_JITTER
             );
+            this.produceRequestBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER
+            );
         }
 
         @Override
@@ -165,8 +235,8 @@ public class ShareGroupDLQStateManager {
 
             if (response.requestHeader().apiKey() == ApiKeys.CREATE_TOPICS) {
                 handleCreateTopicsResponse(response);
-            } else {
-                // handle the response
+            } else if (response.requestHeader().apiKey() == ApiKeys.PRODUCE) {
+                handleProduceResponse(response);
             }
 
             sender.wakeup();
@@ -176,6 +246,19 @@ public class ShareGroupDLQStateManager {
             return "ProduceRequestHandler";
         }
 
+        /**
+         * This method helps determine if the handler could
+         * participate in batching (added to nodeMap). This will
+         * be helpful if the RPCs which cannot be batched are included in
+         * this class as well.
+         *
+         * @return Boolean indicating whether this handler can be coalesced 
with others
+         * to reduce number of RPCs sent.
+         */
+        boolean isBatchable() {
+            return true;
+        }
+
         public void requestErrorResponse(Throwable exception) {
             this.result.completeExceptionally(exception);
         }
@@ -204,6 +287,66 @@ public class ShareGroupDLQStateManager {
                 .setTopics(topicCollection));
         }
 
+        public AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
+            throw new RuntimeException("Produce requests are batchable, hence 
individual requests not needed.");
+        }
+
+        public void populateDLQTopicData() throws ConfigException {
+            Optional<String> dlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            if (dlqTopic.isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic is not 
configured for share group %s.", param.groupId()));
+            }
+
+            ShareGroupDLQMetadataCacheHelper.TopicPartitionData tpData = 
cacheHelper.topicPartitionData(dlqTopic.get());
+
+            if (tpData.topicId().isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic id could 
not be found for share group %s with DLQ topic %s.", param.groupId(), 
dlqTopic.get()));
+            }
+
+            if (tpData.numPartitions().isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic partition 
count could not be found for share group %s with DLQ topic %s.", 
param.groupId(), dlqTopic.get()));
+            }
+
+            if (tpData.partitionLeaderNodes().isEmpty() || 
tpData.partitionLeaderNodes().size() != tpData.numPartitions().get()) {
+                throw new ConfigException(String.format("DLQ topic partition 
leaders for share group %s with DLQ topic %s could not be found.", 
param.groupId(), dlqTopic.get()));
+            }
+
+            this.dlqDestinationPartition = 
param.topicIdPartition().partition() % tpData.numPartitions().get();
+            this.dlqPartitionLeaderNode = 
tpData.partitionLeaderNodes().get(dlqDestinationPartition);
+
+            if (this.dlqPartitionLeaderNode == null || 
this.dlqPartitionLeaderNode.equals(Node.noNode())) {
+                throw new ConfigException(String.format("DLQ topic partition 
leader node for share group %s with DLQ topic %s and partition %d could not be 
found.", param.groupId(), dlqTopic.get(), dlqDestinationPartition));
+            }
+
+            this.dlqTopicPartitionData = tpData;
+        }
+
+        public ProduceRequestData.TopicProduceData topicProduceData() {
+            List<SimpleRecord> simpleRecords = new ArrayList<>();
+            for (long i = param.firstOffset(); i <= param.lastOffset(); i++) {
+                long timestamp = time.hiResClockMs();
+                simpleRecords.add(new SimpleRecord(timestamp, (byte[]) null, 
null, headers(i)));
+            }
+
+            MemoryRecords records = MemoryRecords.withRecords(
+                Compression.NONE,
+                simpleRecords.toArray(new SimpleRecord[]{})
+            );
+
+            return new ProduceRequestData.TopicProduceData()
+                .setName(dlqTopicPartitionData.topicName())
+                .setTopicId(dlqTopicPartitionData.topicId().get())
+                .setPartitionData(List.of(
+                    new ProduceRequestData.PartitionProduceData()
+                        .setIndex(dlqDestinationPartition)  // partition
+                        .setRecords(records)
+                ));
+        }
+
+        public Node dlqPartitionLeaderNode() {
+            return this.dlqPartitionLeaderNode;
+        }
+
         public Optional<Throwable> validateDlqTopic() {
             Optional<String> topicNameOpt = 
cacheHelper.shareGroupDlqTopic(param.groupId());
             Optional<String> topicPrefix = 
cacheHelper.shareGroupDlqTopicPrefix();
@@ -238,7 +381,51 @@ public class ShareGroupDLQStateManager {
 
         public boolean dlqTopicExists() {
             Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
-            return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+            boolean isDlqTopicPresent = 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+            if (isDlqTopicPresent) {
+                try {
+                    populateDLQTopicData();
+                } catch (ConfigException e) {
+                    return false;
+                }
+                addRequestToNodeMap(dlqPartitionLeaderNode, this);
+            }
+            return isDlqTopicPresent;
+        }
+
+        @Override
+        public String toString() {
+            return "ProduceRequestHandler(" +
+                "param: " + param + "\n" +
+                "dlqTopicData: " + dlqTopicPartitionData + "\n" +
+                ")";
+        }
+
+        private Header[] headers(long offset) {
+            List<Header> headers = new ArrayList<>();
+            headers.add(new RecordHeader(HEADER_DLQ_ERRORS_TOPIC, 
recordTopic().getBytes(StandardCharsets.UTF_8)));
+            headers.add(new RecordHeader(HEADER_DLQ_ERRORS_PARTITION, 
Integer.toString(param.topicIdPartition().partition()).getBytes(StandardCharsets.UTF_8)));
+            headers.add(new RecordHeader(HEADER_DLQ_ERRORS_OFFSET, 
Long.toString(offset).getBytes(StandardCharsets.UTF_8)));
+            headers.add(new RecordHeader(HEADER_DLQ_ERRORS_GROUP, 
param.groupId().getBytes(StandardCharsets.UTF_8)));
+            param.deliveryCount().ifPresent(deliveryCount -> headers.add(
+                new RecordHeader(HEADER_DLQ_ERRORS_DELIVERY_COUNT, 
Short.toString(deliveryCount).getBytes(StandardCharsets.UTF_8))));
+            param.cause().ifPresent(cause -> {
+                if (cause.getMessage() != null) {
+                    headers.add(new RecordHeader(HEADER_DLQ_ERRORS_MESSAGE, 
cause.getMessage().getBytes(StandardCharsets.UTF_8)));
+                }
+            });
+
+            return headers.toArray(new Header[0]);
+        }
+
+        private String recordTopic() {
+            TopicIdPartition topicIdPartition = param.topicIdPartition();
+            String recordTopicName = param.topicIdPartition().topic();
+            if (recordTopicName == null || recordTopicName.isEmpty()) {
+                // If topic name lookup fails, use topic id as a String in the 
header.
+                recordTopicName = 
cacheHelper.topicName(param.topicIdPartition().topicId()).orElse(topicIdPartition.topicId().toString());
+            }
+            return recordTopicName;
         }
 
         // Visibility for testing
@@ -247,22 +434,22 @@ public class ShareGroupDLQStateManager {
                 return Optional.empty();
             }
 
-            String dlqTopicName = 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+            String dlqTopicName = 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("<UNKNOWN>");
 
-            LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}", 
name(), dlqTopicName, response);
+            LOG.debug("Response for RPC for handler {} with DLQ topic {} is 
invalid - {}.", this, dlqTopicName, response);
 
             if (response.authenticationException() != null) {
-                LOG.error("Authentication exception", 
response.authenticationException());
+                LOG.error("Authentication exception.", 
response.authenticationException());
                 Errors error = 
Errors.forException(response.authenticationException());
                 return Optional.of(error);
             } else if (response.versionMismatch() != null) {
-                LOG.error("Version mismatch exception", 
response.versionMismatch());
+                LOG.error("Version mismatch exception.", 
response.versionMismatch());
                 Errors error = Errors.forException(response.versionMismatch());
                 return Optional.of(error);
             } else if (response.wasDisconnected()) {    // Retriable
                 return Optional.of(Errors.NETWORK_EXCEPTION);
             } else if (response.wasTimedOut()) {    // Retriable
-                LOG.debug("Response for RPC {} with DLQ topic {} timed out - 
{}.", name(), dlqTopicName, response);
+                LOG.debug("Response for RPC for handler {} with DLQ topic {} 
timed out - {}.", this, dlqTopicName, response);
                 return Optional.of(Errors.REQUEST_TIMED_OUT);
             } else {
                 return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
@@ -270,11 +457,11 @@ public class ShareGroupDLQStateManager {
         }
 
         private void handleCreateTopicsResponse(ClientResponse response) {
-            LOG.debug("Received CreateTopicsResponse {}", response);
+            LOG.debug("Received CreateTopicsResponse {}.", response);
             createTopicsBackoff.incrementAttempt();
             Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
             String clientResponseErrorMessage = clientResponseError.message();
-            String dlqTopicName = 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+            String dlqTopicName = 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("<UNKNOWN>");
 
             switch (clientResponseError) {
                 case NONE:
@@ -292,8 +479,23 @@ public class ShareGroupDLQStateManager {
                     String errorMessage = topicResult.errorMessage();
                     switch (error) {
                         case NONE:
-                            // Replace with enqueue post PRODUCE implementation
-                            this.result.complete(null);
+                            try {
+                                populateDLQTopicData();
+                                createTopicsBackoff.resetAttempts();
+                                if (this.isBatchable()) {
+                                    
addRequestToNodeMap(this.dlqPartitionLeaderNode, this);
+                                } else {
+                                    enqueue(this);
+                                }
+                            } catch (ConfigException e) {
+                                LOG.error("Error enqueueing after DLQ create 
topic response {}.", this, e);
+                                if (!createTopicsBackoff.canAttempt()) {
+                                    LOG.error("Exhausted max retries while 
populating DLQ topic for {} using DLQ topic {} without success.", name(), 
dlqTopicName);
+                                    requestErrorResponse(new 
Exception("Exhausted max retries while populating DLQ topic without success."));
+                                    break;
+                                }
+                                timer.add(new 
ShareGroupDLQTimerTask(createTopicsBackoff.backOff(), this));
+                            }
                             break;
 
                         case TOPIC_ALREADY_EXISTS:
@@ -301,7 +503,7 @@ public class ShareGroupDLQStateManager {
                             // was in-flight. As such this request might get 
TOPIC_ALREADY_EXISTS error, which is acceptable
                             // let it try again and sender logic will take 
care of it.
                         case THROTTLING_QUOTA_EXCEEDED:
-                            LOG.debug("Received retriable error in create DLQ 
topic response for {} using DLQ topic {}: {}", name(), dlqTopicName, 
errorMessage);
+                            LOG.debug("Received retriable error in create DLQ 
topic response for {} using DLQ topic {}: {}.", name(), dlqTopicName, 
errorMessage);
                             if (!createTopicsBackoff.canAttempt()) {
                                 LOG.error("Exhausted max retries to create DLQ 
topic for {} using DLQ topic {} without success.", name(), dlqTopicName);
                                 requestErrorResponse(new Exception("Exhausted 
max retries to create DLQ topic without success."));
@@ -328,7 +530,94 @@ public class ShareGroupDLQStateManager {
                     break;
 
                 default:
-                    LOG.error("Unable to create DLQ topic due to error in 
client response for {} using DLQ topic {}: {}", name(), dlqTopicName, 
clientResponseError.code());
+                    LOG.error("Unable to create DLQ topic due to error in 
client response for {} using DLQ topic {}: {}.", name(), dlqTopicName, 
clientResponseError.code());
+                    requestErrorResponse(clientResponseError.exception());
+            }
+        }
+
+        private void handleProduceResponse(ClientResponse response) {
+            LOG.debug("Received ProduceRequestResponse {}.", response);
+            produceRequestBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
+
+            switch (clientResponseError) {
+                case NONE:
+                    // Produce response received
+                    ProduceResponse produceResponse = ((ProduceResponse) 
response.responseBody());
+                    ProduceResponseData.TopicProduceResponseCollection 
produceResponseCollection = produceResponse.data().responses();
+                    if (produceResponseCollection.isEmpty()) {
+                        LOG.error("Received empty produce response for {} to 
dlq topic node {}.", this, dlqPartitionLeaderNode());
+                        
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+                        break;
+                    }
+
+                    ProduceResponseData.TopicProduceResponse 
topicProduceResponse = produceResponseCollection.find(
+                        new ProduceResponseData.TopicProduceResponse()
+                            .setTopicId(dlqTopicPartitionData.topicId().get())
+                    );
+                    if (topicProduceResponse == null ||
+                        topicProduceResponse.partitionResponses().isEmpty()
+                    ) {
+                        LOG.error("Received empty topic produce response {} to 
dlq topic node {}.", this, dlqPartitionLeaderNode());
+                        
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+                        break;
+                    }
+
+                    List<ProduceResponseData.PartitionProduceResponse> 
partitionResponses = topicProduceResponse.partitionResponses();
+                    ProduceResponseData.PartitionProduceResponse 
partitionResponse = partitionResponses.stream().filter(res -> res.index() == 
dlqDestinationPartition)
+                        .findFirst()
+                        .orElse(null);
+
+                    if (partitionResponse == null) {
+                        LOG.error("Received empty partition produce response 
{} to dlq topic node {}.", this, dlqPartitionLeaderNode());
+                        
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+                        break;
+                    }
+
+                    Errors error = 
Errors.forCode(partitionResponse.errorCode());
+                    String errorMessage = partitionResponse.errorMessage();
+                    switch (error) {
+                        case NONE:
+                            LOG.debug("Successfully produced records {} to dlq 
topic node {}.", this, dlqPartitionLeaderNode());
+                            produceRequestBackoff.resetAttempts();
+                            this.result.complete(null);
+                            break;
+
+                        case NOT_LEADER_OR_FOLLOWER:
+                            LOG.debug("Received retriable error produce 
response for {} to dlq topic node {} - {}.", this, dlqPartitionLeaderNode(), 
errorMessage);
+                            if (!produceRequestBackoff.canAttempt()) {
+                                LOG.error("Exhausted max retries to produce {} 
to  DLQ topic node {}.", this, dlqPartitionLeaderNode());
+                                requestErrorResponse(new Exception("Exhausted 
max retries to produce to DLQ topic without success."));
+                                break;
+                            }
+                            timer.add(new 
ShareGroupDLQTimerTask(produceRequestBackoff.backOff(), this));
+                            break;
+
+                        default:
+                            LOG.error("Unable to produce {} to DLQ topic node 
{} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
+                            
partitionResponse.recordErrors().forEach(recordError ->
+                                LOG.error("Records with errors {} - {}.", 
recordError.batchIndex(), recordError.batchIndexErrorMessage()));
+                            requestErrorResponse(error.exception());
+                    }
+                    break;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    LOG.debug("Received retriable error produce client 
response for {} for DLQ node {} due to {}.",
+                        param, dlqPartitionLeaderNode(), 
clientResponseErrorMessage);
+                    if (!produceRequestBackoff.canAttempt()) {
+                        LOG.error("Exhausted max retries to produce {} to  DLQ 
topic node {} due to client response error {}.",
+                            param, dlqPartitionLeaderNode(), 
clientResponseErrorMessage);
+                        requestErrorResponse(clientResponseError.exception());
+                        break;
+                    }
+                    timer.add(new 
ShareGroupDLQTimerTask(produceRequestBackoff.backOff(), this));
+                    break;
+
+                default:
+                    LOG.error("Unable to produce {} to DLQ topic node {} due 
to client response error {}.",
+                        param, dlqPartitionLeaderNode(), 
clientResponseErrorMessage);
                     requestErrorResponse(clientResponseError.exception());
             }
         }
@@ -345,6 +634,8 @@ public class ShareGroupDLQStateManager {
 
         @Override
         public Collection<RequestAndCompletionHandler> generateRequests() {
+            List<RequestAndCompletionHandler> requests = new ArrayList<>();
+
             if (!queue.isEmpty()) {
                 ShareGroupDLQStateManager.ProduceRequestHandler handler = 
queue.poll();
                 // At this point either a correctly named and configured DLQ 
topic exists or
@@ -354,7 +645,7 @@ public class ShareGroupDLQStateManager {
                     // We need to send RPC to create the topic
                     Node randomNode = randomNode();
                     if (randomNode == Node.noNode()) {
-                        log.error("Unable to find node to send create topic 
request.");
+                        log.error("Unable to find node to send create topic 
request for handler {}.", handler);
                         // fatal failure, cannot retry or progress
                         // fail the RPC
                         
handler.requestErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
@@ -370,12 +661,81 @@ public class ShareGroupDLQStateManager {
                             handler
                         ));
                     } catch (ConfigException exp) {
-                        log.error("Unable to create topic request.", exp);
+                        log.error("Unable to create topic request for handler 
{}.", handler, exp);
                         
handler.requestErrorResponse(Errors.INVALID_CONFIG.exception());
                     }
+                } else {
+                    if (!handler.isBatchable()) {
+                        requests.add(new RequestAndCompletionHandler(
+                            time.milliseconds(),
+                            handler.dlqPartitionLeaderNode(),
+                            handler.requestBuilder(),
+                            handler
+                        ));
+                    }
                 }
             }
-            return List.of();
+
+            // {
+            //  node1: {
+            //      [P1, P2, P3]
+            //  },
+            //  node2: {
+            //.     [P4, P5]
+            //  }, ...
+            // }
+            // For a sequence of produce RPCs, the flow would be:
+            // 1. 1st produce request arrives.
+            // 2. it is enqueued in the send thread.
+            // 3. wakeup event causes the generate requests to create the DLQ 
topic if required.
+            // 4. it will cause either RPC or cache lookup.
+            // 5. once complete, the produce handler is added to the nodeMap 
for batching and not the queue.
+            // 6. wakeup event causes generateRequests to iterate over the map 
and send the produce request (P1) and
+            // remove node from the nodeMap and add it to inFlight.
+            // 7. until P1 completes, more produce requests (P2, P3, ...) 
could come in and get added to the nodeMap as per point 3, 4, 5.
+            // 8. if these belong to same node as P1. They will not be sent as 
the membership test with inFlight will pass.
+            // 9. when P1 completes, it will clear inFlight and raise wakeup 
event.
+            // 10. at this point P2, P3, etc. could be sent as a combined 
request thus achieving batching.
+            final Set<Node> sending = new HashSet<>();
+            final Set<Node> emptyNodes = new HashSet<>();   // Nodes for which 
no coalesced handler was found.
+            synchronized (nodeMapLock) {
+                nodeRPCMap.forEach((destNode, handlers) -> {
+                    // this condition causes requests of same type and same 
destination node
+                    // to not be sent immediately but get batched
+                    if (!inFlight.contains(destNode)) {
+                        CoalesceResults results = 
coalesceProduceRequests(handlers);
+                        if (results.liveHandlers.isEmpty()) {
+                            emptyNodes.add(destNode);
+                            return;
+                        }
+                        requests.add(new RequestAndCompletionHandler(
+                            time.milliseconds(),
+                            destNode,
+                            results.request,
+                            response -> {
+                                inFlight.remove(destNode);
+
+                                // now the combined request has completed
+                                // we need to create responses for individual
+                                // requests which composed the combined request
+                                results.liveHandlers.forEach(handler -> 
handler.onComplete(response));
+                                wakeup();
+                            }));
+                        sending.add(destNode);
+                    }
+                });
+
+                emptyNodes.forEach(nodeRPCMap::remove);
+                sending.forEach(node -> {
+                    // we need to add these nodes to inFlight
+                    inFlight.add(node);
+
+                    // remove from nodeMap
+                    nodeRPCMap.remove(node);
+                });
+            } // close of synchronized context
+
+            return requests;
         }
 
         public void enqueue(ShareGroupDLQStateManager.ProduceRequestHandler 
handler) {
@@ -411,4 +771,39 @@ public class ShareGroupDLQStateManager {
             sender.wakeup();
         }
     }
+
+    private record CoalesceResults(
+        AbstractRequest.Builder<? extends AbstractRequest> request,
+        List<ProduceRequestHandler> liveHandlers
+    ) {
+    }
+
+    private static CoalesceResults 
coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
+        Map<Uuid, ProduceRequestData.TopicProduceData> produceHandlerMap = new 
HashMap<>();
+        List<ProduceRequestHandler> liveHandlers = new 
ArrayList<>(handlers.size());
+        handlers.forEach(handler -> {
+            try {
+                ProduceRequestData.TopicProduceData topicProduceData = 
handler.topicProduceData();
+                produceHandlerMap.computeIfAbsent(topicProduceData.topicId(), 
topicId ->
+                    new ProduceRequestData.TopicProduceData()
+                        .setName(topicProduceData.name())
+                        .setTopicId(topicId)
+                ).partitionData().addAll(topicProduceData.partitionData());
+                liveHandlers.add(handler);
+            } catch (Exception exception) {
+                log.error("Unable to coalesce ProduceRequestData for handler 
{}. It will be skipped from DLQ.", handler, exception);
+                handler.requestErrorResponse(exception);
+            }
+        });
+
+        ProduceRequestData data = new ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(produceHandlerMap.values().iterator()))
+            .setAcks((short) -1)  // all replicas
+            .setTimeoutMs(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT);
+
+        return new CoalesceResults(
+            new ProduceRequest.Builder(ApiKeys.PRODUCE.latestVersion(), 
ApiKeys.PRODUCE.latestVersion(), data),
+            liveHandlers
+        );
+    }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
new file mode 100644
index 00000000000..a6d939003ce
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
@@ -0,0 +1,972 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.ProduceResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper.TopicPartitionData;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.SystemTimerReaper;
+import org.apache.kafka.server.util.timer.Timer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_DELIVERY_COUNT;
+import static 
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_GROUP;
+import static 
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_MESSAGE;
+import static 
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_OFFSET;
+import static 
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_PARTITION;
+import static 
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_TOPIC;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ShareGroupDLQStateManagerTest {
+    private static final MockTime MOCK_TIME = new MockTime();
+    private static final String HOST = "localhost";
+    private static final int PORT = 9092;
+    private static final String GROUP_ID = "test-group";
+    private static final String DLQ_TOPIC = "dlq-topic";
+    private static final Uuid DLQ_TOPIC_ID = Uuid.randomUuid();
+    private static final Uuid SOURCE_TOPIC_ID = Uuid.randomUuid();
+    private static final Node DEFAULT_LEADER = new Node(0, HOST, PORT);
+
+    private final MockTimer mockTimer = new MockTimer(MOCK_TIME);
+    private ShareGroupDLQStateManager stateManager;
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (stateManager != null) {
+            stateManager.stop();
+        }
+    }
+
+    private final class Builder {
+        private KafkaClient client;
+        private Time time = MOCK_TIME;
+        private Timer timer;
+        private ShareGroupDLQMetadataCacheHelper cacheHelper;
+
+        Builder withClient(KafkaClient client) {
+            this.client = client;
+            return this;
+        }
+
+        Builder withCacheHelper(ShareGroupDLQMetadataCacheHelper cacheHelper) {
+            this.cacheHelper = cacheHelper;
+            return this;
+        }
+
+        Builder withTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        Builder withTimer(Timer timer) {
+            this.timer = timer;
+            return this;
+        }
+
+        ShareGroupDLQStateManager build() {
+            return new ShareGroupDLQStateManager(
+                client != null ? client : new MockClient(MOCK_TIME),
+                cacheHelper != null ? cacheHelper : 
happyCacheHelper(DEFAULT_LEADER),
+                time,
+                timer != null ? timer : mockTimer
+            );
+        }
+    }
+
+    private Builder builder() {
+        return new Builder();
+    }
+
+    private static ShareGroupDLQRecordParameter param() {
+        return new ShareGroupDLQRecordParameter(
+            GROUP_ID,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+            0L,
+            2L,
+            Optional.of((short) 1),
+            Optional.of(new RuntimeException("simulated cause")),
+            false
+        );
+    }
+
+    private static ShareGroupDLQMetadataCacheHelper happyCacheHelper(Node 
leader) {
+        ShareGroupDLQMetadataCacheHelper helper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(helper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        when(helper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(helper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(helper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(helper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        when(helper.getClusterNodes()).thenReturn(List.of(leader));
+        
when(helper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+        when(helper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(1),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(leader)
+        ));
+        return helper;
+    }
+
+    private static Throwable getCause(CompletableFuture<Void> future) {
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected the future to complete exceptionally");
+            return null;
+        } catch (ExecutionException ee) {
+            return ee.getCause();
+        } catch (InterruptedException | TimeoutException e) {
+            fail("Future did not complete", e);
+            return null;
+        }
+    }
+
+    /**
+     * Expected headers and offset range for one DLQ partition's records 
inside a captured produce
+     * request. {@code sharedHeaders} are expected to be identical on every 
record in that partition;
+     * the offset header is built per-record from {@code firstOffset}..{@code 
lastOffset}.
+     */
+    private record ExpectedDlqPartition(long firstOffset, long lastOffset, 
Map<String, String> sharedHeaders) {
+    }
+
+    /**
+     * Verifies record-level headers for every partition of the (single) topic 
in a captured produce
+     * request. Keys of {@code expectedByPartitionIndex} are DLQ partition 
indices (as reported by
+     * {@link ProduceRequestData.PartitionProduceData#index()}); the set of 
partitions present in the
+     * request must match the keys exactly.
+     */
+    private static void assertDlqProduceRecordHeaders(
+        ProduceRequest request,
+        Map<Integer, ExpectedDlqPartition> expectedByPartitionIndex
+    ) {
+        ProduceRequestData.TopicProduceData topic = 
request.data().topicData().iterator().next();
+        Set<Integer> actualPartitionIndices = topic.partitionData().stream()
+            .map(ProduceRequestData.PartitionProduceData::index)
+            .collect(Collectors.toSet());
+        assertEquals(expectedByPartitionIndex.keySet(), actualPartitionIndices,
+            "Unexpected set of DLQ partitions in produce request");
+
+        for (ProduceRequestData.PartitionProduceData partition : 
topic.partitionData()) {
+            ExpectedDlqPartition expected = 
expectedByPartitionIndex.get(partition.index());
+            MemoryRecords records = (MemoryRecords) partition.records();
+
+            long expectedOffset = expected.firstOffset();
+            int recordCount = 0;
+            for (Record record : records.records()) {
+                Map<String, String> actualHeaders = new HashMap<>();
+                for (Header h : record.headers()) {
+                    actualHeaders.put(h.key(), new String(h.value(), 
StandardCharsets.UTF_8));
+                }
+                Map<String, String> expectedHeaders = new 
HashMap<>(expected.sharedHeaders());
+                expectedHeaders.put(HEADER_DLQ_ERRORS_OFFSET, 
Long.toString(expectedOffset));
+                assertEquals(expectedHeaders, actualHeaders,
+                    "Partition " + partition.index() + " record at offset " + 
expectedOffset + " has unexpected headers");
+                expectedOffset++;
+                recordCount++;
+            }
+            assertEquals((int) (expected.lastOffset() - expected.firstOffset() 
+ 1), recordCount,
+                "Partition " + partition.index() + " has unexpected number of 
records");
+        }
+    }
+
+    // ---- Constructor null-check tests ----
+
+    @Test
+    public void testConstructorRejectsNullClient() {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        assertThrows(IllegalArgumentException.class,
+            () -> new ShareGroupDLQStateManager(null, cacheHelper, MOCK_TIME, 
mockTimer));
+    }
+
+    @Test
+    public void testConstructorRejectsNullCacheHelper() {
+        KafkaClient client = mock(KafkaClient.class);
+        assertThrows(IllegalArgumentException.class,
+            () -> new ShareGroupDLQStateManager(client, null, MOCK_TIME, 
mockTimer));
+    }
+
+    @Test
+    public void testConstructorRejectsNullTime() {
+        KafkaClient client = mock(KafkaClient.class);
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        assertThrows(IllegalArgumentException.class,
+            () -> new ShareGroupDLQStateManager(client, cacheHelper, null, 
mockTimer));
+    }
+
+    @Test
+    public void testConstructorRejectsNullTimer() {
+        KafkaClient client = mock(KafkaClient.class);
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        assertThrows(IllegalArgumentException.class,
+            () -> new ShareGroupDLQStateManager(client, cacheHelper, 
MOCK_TIME, null));
+    }
+
+    // ---- Lifecycle tests ----
+
+    @Test
+    public void testStartIsIdempotent() {
+        stateManager = builder().build();
+
+        stateManager.start();
+        stateManager.start();
+        // tearDown will call stateManager.stop() and must not throw.
+    }
+
+    @Test
+    public void testStopWithoutStartIsNoOp() {
+        stateManager = builder().build();
+        // tearDown will call stateManager.stop() without a prior start() and 
must not throw.
+    }
+
+    // ---- DLQ topic validation tests (no thread start required) ----
+
+    @Test
+    public void testDlqEmptyTopicNameFailsValidation() throws Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.empty());
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertInstanceOf(ConfigException.class, cause);
+        assertTrue(cause.getMessage().contains("empty"));
+    }
+
+    @Test
+    public void testDlqTopicStartingWithUnderscoreFailsValidation() throws 
Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of("__internal_dlq"));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertInstanceOf(ConfigException.class, cause);
+        assertTrue(cause.getMessage().contains("__"));
+    }
+
+    @Test
+    public void testDlqExistingTopicWithoutDlqConfigFailsValidation() throws 
Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(false);
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertInstanceOf(ConfigException.class, cause);
+        assertTrue(cause.getMessage().contains("DLQ is not enabled"));
+    }
+
+    @Test
+    public void testDlqTopicMissingAndAutoCreateDisabledFailsValidation() 
throws Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(false);
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertInstanceOf(ConfigException.class, cause);
+        assertTrue(cause.getMessage().contains("auto create is disabled"));
+    }
+
+    @Test
+    public void testDlqTopicPrefixMismatchFailsValidation() throws Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.of("required-prefix-"));
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertInstanceOf(ConfigException.class, cause);
+        assertTrue(cause.getMessage().contains("does not comply with the DLQ 
topic prefix"));
+    }
+
+    @Test
+    public void testDlqValidationFailureCompletesFutureBeforeStart() throws 
Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.empty());
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+        // validateDlqTopic runs synchronously inside dlq(), so it should fail 
without the sender thread.
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        CompletableFuture<Void> result = stateManager.dlq(param());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFalse(result.isCancelled());
+    }
+
+    // ---- Full integration tests ----
+
+    @Test
+    public void testDlqHappyPathExistingTopic() throws Exception {
+        MockClient client = new MockClient(MOCK_TIME);
+        List<ProduceRequest> capturedProduces = new ArrayList<>();
+        client.prepareResponseFrom(
+            body -> {
+                if (body instanceof ProduceRequest pr) {
+                    capturedProduces.add(pr);
+                    return true;
+                }
+                return false;
+            },
+            successfulProduceResponse(0),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder().withClient(client).build();
+        stateManager.start();
+        assertNull(stateManager.dlq(param()).get(10, TimeUnit.SECONDS));
+
+        assertEquals(1, capturedProduces.size());
+        assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+            0, new ExpectedDlqPartition(0L, 2L, Map.of(
+                HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+                HEADER_DLQ_ERRORS_PARTITION, "0",
+                HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+                HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+                HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+            ))
+        ));
+    }
+
+    @Test
+    public void testDlqTopicPrefixEmptyStringSkipsPrefixCheck() throws 
Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.of(""));
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+        when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(1),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(DEFAULT_LEADER)
+        ));
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
+        MockClient client = new MockClient(MOCK_TIME);
+        List<ProduceRequest> capturedProduces = new ArrayList<>();
+        client.prepareResponseFrom(
+            body -> {
+                if (body instanceof ProduceRequest pr) {
+                    capturedProduces.add(pr);
+                    return true;
+                }
+                return false;
+            },
+            successfulProduceResponse(0),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder()
+            .withClient(client)
+            .withCacheHelper(cacheHelper)
+            .build();
+        stateManager.start();
+        assertNull(stateManager.dlq(param()).get(10, TimeUnit.SECONDS));
+
+        assertEquals(1, capturedProduces.size());
+        assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+            0, new ExpectedDlqPartition(0L, 2L, Map.of(
+                HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+                HEADER_DLQ_ERRORS_PARTITION, "0",
+                HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+                HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+                HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+            ))
+        ));
+    }
+
+    @Test
+    public void testDlqCreateTopicThenProduceSucceeds() throws Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+        
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+        when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(1),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(DEFAULT_LEADER)
+        ));
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+
+        MockClient client = new MockClient(MOCK_TIME);
+        List<ProduceRequest> capturedProduces = new ArrayList<>();
+        client.prepareResponseFrom(
+            body -> body instanceof CreateTopicsRequest,
+            successfulCreateTopicsResponse(),
+            DEFAULT_LEADER
+        );
+        client.prepareResponseFrom(
+            body -> {
+                if (body instanceof ProduceRequest pr) {
+                    capturedProduces.add(pr);
+                    return true;
+                }
+                return false;
+            },
+            successfulProduceResponse(0),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder()
+            .withClient(client)
+            .withCacheHelper(cacheHelper)
+            .build();
+        stateManager.start();
+        assertNull(stateManager.dlq(param()).get(10, TimeUnit.SECONDS));
+
+        assertEquals(1, capturedProduces.size());
+        assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+            0, new ExpectedDlqPartition(0L, 2L, Map.of(
+                HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+                HEADER_DLQ_ERRORS_PARTITION, "0",
+                HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+                HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+                HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+            ))
+        ));
+    }
+
+    @Test
+    public void testDlqCreateTopicFatalErrorFailsFuture() throws Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+
+        MockClient client = new MockClient(MOCK_TIME);
+        client.prepareResponseFrom(
+            body -> body instanceof CreateTopicsRequest,
+            createTopicsResponse(Errors.INVALID_REPLICATION_FACTOR),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder()
+            .withClient(client)
+            .withCacheHelper(cacheHelper)
+            .build();
+        stateManager.start();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertNotNull(cause);
+        assertEquals(Errors.INVALID_REPLICATION_FACTOR.exception().getClass(), 
cause.getClass());
+    }
+
+    @Test
+    public void testDlqCreateTopicNoClusterNodesFailsFuture() throws Exception 
{
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+        when(cacheHelper.getClusterNodes()).thenReturn(List.of());
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        stateManager.start();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertNotNull(cause);
+        assertEquals(Errors.BROKER_NOT_AVAILABLE.exception().getClass(), 
cause.getClass());
+    }
+
+    @Test
+    public void testDlqCreateTopicRetriesExhaustedFailsWithNetworkException() 
throws Exception {
+        int maxAttempts = 3;
+        // Force the create-topic path: configured DLQ topic does not yet 
exist in metadata.
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
+        // Real timer with tiny backoffs lets the exhaustion path actually 
fire in a few ms.
+        Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+            new SystemTimer("shareGroupDLQTestTimer"));
+        try {
+            MockClient client = new MockClient(MOCK_TIME);
+            for (int i = 0; i < maxAttempts; i++) {
+                client.prepareResponseFrom(
+                    body -> body instanceof CreateTopicsRequest,
+                    null,
+                    DEFAULT_LEADER,
+                    true
+                );
+            }
+
+            stateManager = builder()
+                .withClient(client)
+                .withCacheHelper(cacheHelper)
+                .withTimer(realTimer)
+                .build();
+            stateManager.start();
+
+            Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L, 
maxAttempts));
+            assertNotNull(cause);
+            assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(), 
cause.getClass());
+        } finally {
+            Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+        }
+    }
+
+    @Test
+    public void testDlqCreateTopicPartialFailuresThenSucceeds() throws 
Exception {
+        int maxAttempts = 3;
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+        
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+        when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(1),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(DEFAULT_LEADER)
+        ));
+
+        Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+            new SystemTimer("shareGroupDLQTestTimer"));
+        try {
+            MockClient client = new MockClient(MOCK_TIME);
+            List<ProduceRequest> capturedProduces = new ArrayList<>();
+            // Two CreateTopics disconnects (retriable), then a successful 
create, then the produce succeeds.
+            client.prepareResponseFrom(body -> body instanceof 
CreateTopicsRequest, null, DEFAULT_LEADER, true);
+            client.prepareResponseFrom(body -> body instanceof 
CreateTopicsRequest, null, DEFAULT_LEADER, true);
+            client.prepareResponseFrom(
+                body -> body instanceof CreateTopicsRequest,
+                successfulCreateTopicsResponse(),
+                DEFAULT_LEADER
+            );
+            client.prepareResponseFrom(
+                body -> {
+                    if (body instanceof ProduceRequest pr) {
+                        capturedProduces.add(pr);
+                        return true;
+                    }
+                    return false;
+                },
+                successfulProduceResponse(0),
+                DEFAULT_LEADER
+            );
+
+            stateManager = builder()
+                .withClient(client)
+                .withCacheHelper(cacheHelper)
+                .withTimer(realTimer)
+                .build();
+            stateManager.start();
+
+            assertNull(stateManager.dlq(param(), 1L, 5L, maxAttempts).get(5, 
TimeUnit.SECONDS));
+
+            assertEquals(1, capturedProduces.size());
+            assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+                0, new ExpectedDlqPartition(0L, 2L, Map.of(
+                    HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+                    HEADER_DLQ_ERRORS_PARTITION, "0",
+                    HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+                    HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+                    HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+                ))
+            ));
+        } finally {
+            Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+        }
+    }
+
+    @Test
+    public void testDlqProducePartialFailuresThenSucceeds() throws Exception {
+        int maxAttempts = 3;
+        Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+            new SystemTimer("shareGroupDLQTestTimer"));
+        try {
+            MockClient client = new MockClient(MOCK_TIME);
+            List<ProduceRequest> capturedProduces = new ArrayList<>();
+            // Two Produce disconnects (retriable), then a successful produce. 
Capture all attempts;
+            // the retried attempts must carry the same headers as the 
successful one.
+            MockClient.RequestMatcher captureProduce = body -> {
+                if (body instanceof ProduceRequest pr) {
+                    capturedProduces.add(pr);
+                    return true;
+                }
+                return false;
+            };
+            client.prepareResponseFrom(captureProduce, null, DEFAULT_LEADER, 
true);
+            client.prepareResponseFrom(captureProduce, null, DEFAULT_LEADER, 
true);
+            client.prepareResponseFrom(captureProduce, 
successfulProduceResponse(0), DEFAULT_LEADER);
+
+            stateManager = builder()
+                .withClient(client)
+                .withTimer(realTimer)
+                .build();
+            stateManager.start();
+
+            assertNull(stateManager.dlq(param(), 1L, 5L, maxAttempts).get(5, 
TimeUnit.SECONDS));
+
+            assertEquals(maxAttempts, capturedProduces.size());
+            Map<Integer, ExpectedDlqPartition> expectedByPartition = Map.of(
+                0, new ExpectedDlqPartition(0L, 2L, Map.of(
+                    HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+                    HEADER_DLQ_ERRORS_PARTITION, "0",
+                    HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+                    HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+                    HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+                ))
+            );
+            for (ProduceRequest pr : capturedProduces) {
+                assertDlqProduceRecordHeaders(pr, expectedByPartition);
+            }
+        } finally {
+            Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+        }
+    }
+
+    @Test
+    public void testDlqProduceFatalErrorFailsFuture() throws Exception {
+        MockClient client = new MockClient(MOCK_TIME);
+        client.prepareResponseFrom(
+            body -> body instanceof ProduceRequest,
+            produceResponseWithError(Errors.INVALID_TOPIC_EXCEPTION),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder().withClient(client).build();
+        stateManager.start();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertNotNull(cause);
+        assertEquals(Errors.INVALID_TOPIC_EXCEPTION.exception().getClass(), 
cause.getClass());
+    }
+
+    @Test
+    public void testDlqProduceEmptyResponseFailsFuture() throws Exception {
+        MockClient client = new MockClient(MOCK_TIME);
+        client.prepareResponseFrom(
+            body -> body instanceof ProduceRequest,
+            new ProduceResponse(new ProduceResponseData()),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder().withClient(client).build();
+        stateManager.start();
+        Throwable cause = getCause(stateManager.dlq(param()));
+        assertNotNull(cause);
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR.exception().getClass(), 
cause.getClass());
+    }
+
+    @Test
+    public void testDlqProduceDisconnectIsRetriedNotImmediatelyFailed() throws 
Exception {
+        MockClient client = new MockClient(MOCK_TIME);
+        // Null response body + disconnected=true triggers the 
wasDisconnected() branch in
+        // ShareGroupDLQStateManager#checkResponseError. Since the disconnect 
is retriable, the
+        // future must NOT complete on the first attempt - we just verify the 
retry was scheduled
+        // rather than waiting for full retry exhaustion (which can take ~30s 
due to the
+        // hard-coded exponential backoff in ShareGroupDLQStateManager).
+        client.prepareResponseFrom(
+            body -> body instanceof ProduceRequest,
+            null,
+            DEFAULT_LEADER,
+            true
+        );
+
+        stateManager = builder().withClient(client).build();
+        stateManager.start();
+        CompletableFuture<Void> result = stateManager.dlq(param());
+        // Brief wait so the disconnect response can be processed; the future 
should remain
+        // pending because the retry has been scheduled rather than completing 
exceptionally.
+        try {
+            result.get(500, TimeUnit.MILLISECONDS);
+            fail("Expected the future to remain incomplete while retry is 
pending");
+        } catch (TimeoutException expected) {
+            assertFalse(result.isDone());
+        }
+    }
+
+    @Test
+    public void testDlqProduceRetriesExhaustedFailsWithNetworkException() 
throws Exception {
+        int maxAttempts = 3;
+        // Real timer with tiny backoffs lets the exhaustion path actually 
fire in a few ms,
+        // rather than the ~30s a MockTimer-less production-like setup would 
take.
+        Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+            new SystemTimer("shareGroupDLQTestTimer"));
+        try {
+            MockClient client = new MockClient(MOCK_TIME);
+            // Each retry consumes one prepared response; stage one disconnect 
per attempt.
+            for (int i = 0; i < maxAttempts; i++) {
+                client.prepareResponseFrom(
+                    body -> body instanceof ProduceRequest,
+                    null,
+                    DEFAULT_LEADER,
+                    true
+                );
+            }
+
+            stateManager = builder()
+                .withClient(client)
+                .withTimer(realTimer)
+                .build();
+            stateManager.start();
+
+            Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L, 
maxAttempts));
+            assertNotNull(cause);
+            assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(), 
cause.getClass());
+        } finally {
+            Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+        }
+    }
+
+    @Test
+    public void testDlqTwoEnqueuedRecordsBothComplete() throws Exception {
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+        
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+        when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(2),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(DEFAULT_LEADER, DEFAULT_LEADER)
+        ));
+
+        // Whether the two handlers end up coalesced into a single produce 
request or are sent as
+        // two separate requests depends on internal scheduling. Provide a 
multi-partition response
+        // that satisfies either case: each request will see partition indices 
0 and 1 in the
+        // response, and the handler picks out the index that matches its 
destination partition.
+        ProduceResponseData.TopicProduceResponse topicResp = new 
ProduceResponseData.TopicProduceResponse()
+            .setTopicId(DLQ_TOPIC_ID)
+            .setPartitionResponses(List.of(
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(0)
+                    .setErrorCode(Errors.NONE.code()),
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(1)
+                    .setErrorCode(Errors.NONE.code())
+            ));
+        ProduceResponseData.TopicProduceResponseCollection collection =
+            new ProduceResponseData.TopicProduceResponseCollection();
+        collection.add(topicResp);
+
+        MockClient client = new MockClient(MOCK_TIME);
+        List<ProduceRequest> capturedProduces = new ArrayList<>();
+        MockClient.RequestMatcher captureProduce = body -> {
+            if (body instanceof ProduceRequest pr) {
+                capturedProduces.add(pr);
+                return true;
+            }
+            return false;
+        };
+        // Two identical responses cover the non-coalesced path.
+        client.prepareResponseFrom(captureProduce,
+            new ProduceResponse(new 
ProduceResponseData().setResponses(collection.duplicate())),
+            DEFAULT_LEADER);
+        client.prepareResponseFrom(captureProduce,
+            new ProduceResponse(new 
ProduceResponseData().setResponses(collection.duplicate())),
+            DEFAULT_LEADER);
+
+        stateManager = builder()
+            .withClient(client)
+            .withCacheHelper(cacheHelper)
+            .build();
+        stateManager.start();
+        ShareGroupDLQRecordParameter p0 = new ShareGroupDLQRecordParameter(
+            GROUP_ID,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+        ShareGroupDLQRecordParameter p1 = new ShareGroupDLQRecordParameter(
+            GROUP_ID,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+
+        CompletableFuture<Void> r0 = stateManager.dlq(p0);
+        CompletableFuture<Void> r1 = stateManager.dlq(p1);
+
+        assertNull(r0.get(10, TimeUnit.SECONDS));
+        assertNull(r1.get(10, TimeUnit.SECONDS));
+
+        // Two source partitions map to two distinct DLQ partition indices (0 
and 1, given 2
+        // DLQ partitions). Whether they end up coalesced into a single 
request (one topic, two
+        // partitions) or sent as two requests (each with one partition) 
depends on scheduling.
+        ExpectedDlqPartition expectedDlqPartition0 = new 
ExpectedDlqPartition(0L, 0L, Map.of(
+            HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+            HEADER_DLQ_ERRORS_PARTITION, "0",
+            HEADER_DLQ_ERRORS_GROUP, GROUP_ID
+        ));
+        ExpectedDlqPartition expectedDlqPartition1 = new 
ExpectedDlqPartition(0L, 0L, Map.of(
+            HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+            HEADER_DLQ_ERRORS_PARTITION, "1",
+            HEADER_DLQ_ERRORS_GROUP, GROUP_ID
+        ));
+        if (capturedProduces.size() == 1) {
+            assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+                0, expectedDlqPartition0,
+                1, expectedDlqPartition1
+            ));
+        } else {
+            assertEquals(2, capturedProduces.size(),
+                "Expected coalesced (1 request) or non-coalesced (2 requests), 
got " + capturedProduces.size());
+            for (ProduceRequest pr : capturedProduces) {
+                int dlqPartitionIndex = 
pr.data().topicData().iterator().next().partitionData().get(0).index();
+                ExpectedDlqPartition expected = dlqPartitionIndex == 0 ? 
expectedDlqPartition0 : expectedDlqPartition1;
+                assertDlqProduceRecordHeaders(pr, Map.of(dlqPartitionIndex, 
expected));
+            }
+        }
+    }
+
+    @Test
+    public void testDlqResolvesSourceTopicNameViaCacheHelperWhenMissing() 
throws Exception {
+        MockClient client = new MockClient(MOCK_TIME);
+        List<ProduceRequest> capturedProduces = new ArrayList<>();
+        client.prepareResponseFrom(
+            body -> {
+                if (body instanceof ProduceRequest pr) {
+                    capturedProduces.add(pr);
+                    return true;
+                }
+                return false;
+            },
+            successfulProduceResponse(0),
+            DEFAULT_LEADER
+        );
+
+        stateManager = builder().withClient(client).build();
+        stateManager.start();
+        ShareGroupDLQRecordParameter p = new ShareGroupDLQRecordParameter(
+            GROUP_ID,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 0, null),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+        assertNull(stateManager.dlq(p).get(10, TimeUnit.SECONDS));
+
+        assertEquals(1, capturedProduces.size());
+        // Source topic name was null in the parameter; the manager must have 
resolved it via
+        // ShareGroupDLQMetadataCacheHelper.topicName(SOURCE_TOPIC_ID), which 
the happy helper
+        // returns as "source-topic".
+        assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+            0, new ExpectedDlqPartition(0L, 0L, Map.of(
+                HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+                HEADER_DLQ_ERRORS_PARTITION, "0",
+                HEADER_DLQ_ERRORS_GROUP, GROUP_ID
+            ))
+        ));
+    }
+
+    // ---- Response builder helpers ----
+
+    private static ProduceResponse successfulProduceResponse(int partition) {
+        return produceResponseFor(partition, Errors.NONE);
+    }
+
+    private static ProduceResponse produceResponseWithError(Errors error) {
+        return produceResponseFor(0, error);
+    }
+
+    private static ProduceResponse produceResponseFor(int partition, Errors 
error) {
+        // Don't set name: the manager looks up the TopicProduceResponse using 
only topicId, which
+        // implies the lookup-key name is the default empty string.
+        ProduceResponseData.TopicProduceResponse topicResp = new 
ProduceResponseData.TopicProduceResponse()
+            .setTopicId(DLQ_TOPIC_ID)
+            .setPartitionResponses(List.of(
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(partition)
+                    .setErrorCode(error.code())
+                    .setErrorMessage(error.message())
+            ));
+        ProduceResponseData.TopicProduceResponseCollection collection =
+            new ProduceResponseData.TopicProduceResponseCollection();
+        collection.add(topicResp);
+        return new ProduceResponse(new 
ProduceResponseData().setResponses(collection));
+    }
+
+    private static CreateTopicsResponse successfulCreateTopicsResponse() {
+        return createTopicsResponse(Errors.NONE);
+    }
+
+    private static CreateTopicsResponse createTopicsResponse(Errors error) {
+        CreateTopicsResponseData data = new CreateTopicsResponseData();
+        data.topics().add(new CreateTopicsResponseData.CreatableTopicResult()
+            .setName(DLQ_TOPIC)
+            .setTopicId(DLQ_TOPIC_ID)
+            .setNumPartitions(1)
+            .setReplicationFactor((short) 1)
+            .setErrorCode(error.code())
+            .setErrorMessage(error.message()));
+        return new CreateTopicsResponse(data);
+    }
+}

Reply via email to