This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69d4bfe9b93 KAFKA-20612: Stitch DLQ metrics with 
ShareGroupDLQStateManager. (#22412)
69d4bfe9b93 is described below

commit 69d4bfe9b93a8041f64375d0f4a47fd823cdc8ce
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jun 5 00:40:39 2026 +0530

    KAFKA-20612: Stitch DLQ metrics with ShareGroupDLQStateManager. (#22412)
    
    * Stitched `ShareGroupMetrics` with `ShareGroupDLQStateManager `. This
    includes 3 metrics defined in KIP-1191:
      * total records written - incremented on success response from produce
    RPC
      * produce requests total per group per sec - incremented for each new
    handler enqueued for produce
      * produce requests failed per group per sec - incremented for every
    handler with failed produce response
    * New tests have been added to `ShareGroupDLQStateManagerTest` to test
    batching and metrics.
    * Currently, ownership of `ShareGroupMetrics` was with
    `SharePartitionManager` which has been modified. Now the owner is
    `BrokerServer` which passes the object to SPM and SGDSM. Closing the
    metrics is also done by the `BrokerServer`.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../kafka/server/share/SharePartitionManager.java  |   4 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  13 +-
 .../server/share/SharePartitionManagerTest.java    |   1 -
 .../share/dlq/DefaultShareGroupDLQManager.java     |  21 +-
 .../share/dlq/ShareGroupDLQStateManager.java       |  45 +-
 .../share/dlq/ShareGroupDLQStateManagerTest.java   | 492 ++++++++++++++++++++-
 6 files changed, 560 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index a8eff11ac90..dcd2ccd65e9 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -191,6 +191,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         long remoteFetchMaxWaitMs,
         Persister persister,
         ShareGroupConfigProvider configProvider,
+        ShareGroupMetrics shareGroupMetrics,
         BrokerTopicStats brokerTopicStats,
         Supplier<Boolean> shareGroupDlqEnableSupplier,
         ShareGroupDLQManager shareGroupDLQManager
@@ -208,7 +209,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             remoteFetchMaxWaitMs,
             persister,
             configProvider,
-            new ShareGroupMetrics(time),
+            shareGroupMetrics,
             brokerTopicStats,
             shareGroupDlqEnableSupplier,
             shareGroupDLQManager
@@ -670,7 +671,6 @@ public class SharePartitionManager implements AutoCloseable 
{
     @Override
     public void close() throws Exception {
         this.timer.close();
-        this.shareGroupMetrics.close();
     }
 
     private ShareSessionKey shareSessionKey(String groupId, String memberId) {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 63a0472ddf0..c339a1d595a 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -64,6 +64,7 @@ import 
org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogManager
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.apache.kafka.server.partition.{AlterPartitionManager, 
DefaultAlterPartitionManager}
 import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager, 
NoOpShareGroupDLQManager, ShareGroupDLQManager}
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics
 
 import java.time.Duration
 import java.util
@@ -166,6 +167,8 @@ class BrokerServer(
 
   var clientMetricsManager: ClientMetricsManager = _
 
+  var shareGroupMetrics: ShareGroupMetrics = _
+
   var sharePartitionManager: SharePartitionManager = _
 
   var persister: Persister = _
@@ -390,6 +393,9 @@ class BrokerServer(
       /* create persister */
       persister = createShareStatePersister()
 
+      /* create metrics object to be shared with share DLQ manager share 
partition manager*/
+      shareGroupMetrics = new ShareGroupMetrics(time)
+
       /* create share group DLQ manager */
       shareGroupDLQManager = createShareGroupDLQManager()
 
@@ -472,6 +478,7 @@ class BrokerServer(
         config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
         persister,
         new ShareGroupConfigProvider(groupConfigManager),
+        shareGroupMetrics,
         brokerTopicStats,
         () => 
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)).supportsShareGroupDLQ(),
         shareGroupDLQManager
@@ -769,7 +776,8 @@ class BrokerServer(
           NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config, 
metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager 
broker=${config.brokerId}]")),
           new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
           Time.SYSTEM,
-          shareGroupTimer
+          shareGroupTimer,
+          shareGroupMetrics
         )
       } else if 
(klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
         info("Using no-op share group DLQ manager")
@@ -921,6 +929,9 @@ class BrokerServer(
       if (shareGroupDLQManager != null)
         Utils.swallow(this.logger.underlying, () => 
shareGroupDLQManager.stop())
 
+      if (shareGroupMetrics != null)
+        Utils.swallow(this.logger.underlying, () => shareGroupMetrics.close())
+
       Utils.closeQuietly(shareGroupTimer, "share group timer")
 
       if (lifecycleManager != null)
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 3bde6678921..116a92f1bc9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1266,7 +1266,6 @@ public class SharePartitionManagerTest {
         sharePartitionManager.close();
         // Verify that the timer object in sharePartitionManager is closed by 
checking the calls to timer.close() and shareGroupMetrics.close().
         Mockito.verify(timer, times(1)).close();
-        Mockito.verify(shareGroupMetrics, times(1)).close();
     }
 
     @Test
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
 
b/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
index 47fed728f23..32e242b0ed1 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.share.dlq;
 
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
 import org.apache.kafka.server.util.timer.Timer;
 
 import org.slf4j.Logger;
@@ -39,14 +40,26 @@ public class DefaultShareGroupDLQManager implements 
ShareGroupDLQManager {
 
     private static final Logger log = 
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
 
-    public static ShareGroupDLQManager instance(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
-        DefaultShareGroupDLQManager instance = new 
DefaultShareGroupDLQManager(client, cacheHelper, time, timer);
+    public static ShareGroupDLQManager instance(
+        KafkaClient client,
+        ShareGroupDLQMetadataCacheHelper cacheHelper,
+        Time time,
+        Timer timer,
+        ShareGroupMetrics metrics
+    ) {
+        DefaultShareGroupDLQManager instance = new 
DefaultShareGroupDLQManager(client, cacheHelper, time, timer, metrics);
         instance.start();
         return instance;
     }
 
-    private DefaultShareGroupDLQManager(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
-        this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, 
time, timer);
+    private DefaultShareGroupDLQManager(
+        KafkaClient client,
+        ShareGroupDLQMetadataCacheHelper cacheHelper,
+        Time time,
+        Timer timer,
+        ShareGroupMetrics shareGroupMetrics
+    ) {
+        this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, 
time, timer, shareGroupMetrics);
     }
 
     private void start() {
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
 
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index 8613582ceea..b54878d7187 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
 import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
 import org.apache.kafka.server.util.InterBrokerSendThread;
 import org.apache.kafka.server.util.RequestAndCompletionHandler;
 import org.apache.kafka.server.util.timer.Timer;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -80,6 +82,7 @@ public class ShareGroupDLQStateManager {
     private final Time time;
     private final Timer timer;
     private final ShareGroupDLQMetadataCacheHelper cacheHelper;
+    private final ShareGroupMetrics shareGroupMetrics;
     public static final long REQUEST_BACKOFF_MS = 1_000L;
     public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
     private static final int MAX_REQUEST_ATTEMPTS = 5;
@@ -91,7 +94,13 @@ public class ShareGroupDLQStateManager {
     private final Map<Node, List<ProduceRequestHandler>> nodeRPCMap = new 
HashMap<>();
     private final Object nodeMapLock = new Object();
 
-    public ShareGroupDLQStateManager(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+    public ShareGroupDLQStateManager(
+        KafkaClient client,
+        ShareGroupDLQMetadataCacheHelper cacheHelper,
+        Time time,
+        Timer timer,
+        ShareGroupMetrics shareGroupMetrics
+    ) {
         if (client == null) {
             throw new IllegalArgumentException("Kafkaclient must not be 
null.");
         }
@@ -108,9 +117,14 @@ public class ShareGroupDLQStateManager {
             throw new IllegalArgumentException("Timer must not be null.");
         }
 
+        if (shareGroupMetrics == null) {
+            throw new IllegalArgumentException("ShareGroupMetrics must not be 
null.");
+        }
+
         this.time = time;
         this.timer = timer;
         this.cacheHelper = cacheHelper;
+        this.shareGroupMetrics = shareGroupMetrics;
         this.sender = new SendThread(
             "ShareGroupDLQSendThread",
             client,
@@ -153,6 +167,16 @@ public class ShareGroupDLQStateManager {
         return future;
     }
 
+    // Visibility for tests
+    Map<Node, List<ShareGroupDLQStateManager.ProduceRequestHandler>> 
nodeRPCMap() {
+        // Using Collections.unmodifiableMap and not Map.copyOf as we are 
looking for a quick
+        // immutable view of the map in the tests. The tests will invoke the
+        // method repeatedly to check the state of the map. Map.copyOf will 
create
+        // a deep copy of the map on every call and changes will might get 
missed resulting
+        // in flakiness.
+        return Collections.unmodifiableMap(nodeRPCMap);
+    }
+
     private void enqueue(ProduceRequestHandler requestHandler) {
         sender.enqueue(requestHandler);
     }
@@ -332,6 +356,10 @@ public class ShareGroupDLQStateManager {
                 simpleRecords.toArray(new SimpleRecord[]{})
             );
 
+            // Update the metric to say a new request is created to se sent. 
This might not be the
+            // actual RPC count as we coalesce the requests before sending.
+            shareGroupMetrics.recordDLQProduce(param.groupId());
+
             return new ProduceRequestData.TopicProduceData()
                 .setName(dlqTopicPartitionData.topicName())
                 .setTopicId(dlqTopicPartitionData.topicId().get())
@@ -579,6 +607,7 @@ public class ShareGroupDLQStateManager {
                     switch (error) {
                         case NONE:
                             LOG.debug("Successfully produced records {} to dlq 
topic node {}.", this, dlqPartitionLeaderNode());
+                            
shareGroupMetrics.recordDLQRecordWrite(param.groupId(), (int) 
(param.lastOffset() - param.firstOffset() + 1));
                             produceRequestBackoff.resetAttempts();
                             this.result.complete(null);
                             break;
@@ -587,6 +616,7 @@ public class ShareGroupDLQStateManager {
                             LOG.debug("Received retriable error produce 
response for {} to dlq topic node {} - {}.", this, dlqPartitionLeaderNode(), 
errorMessage);
                             if (!produceRequestBackoff.canAttempt()) {
                                 LOG.error("Exhausted max retries to produce {} 
to  DLQ topic node {}.", this, dlqPartitionLeaderNode());
+                                
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
                                 requestErrorResponse(new Exception("Exhausted 
max retries to produce to DLQ topic without success."));
                                 break;
                             }
@@ -597,6 +627,7 @@ public class ShareGroupDLQStateManager {
                             LOG.error("Unable to produce {} to DLQ topic node 
{} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
                             
partitionResponse.recordErrors().forEach(recordError ->
                                 LOG.error("Records with errors {} - {}.", 
recordError.batchIndex(), recordError.batchIndexErrorMessage()));
+                            
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
                             requestErrorResponse(error.exception());
                     }
                     break;
@@ -608,6 +639,7 @@ public class ShareGroupDLQStateManager {
                     if (!produceRequestBackoff.canAttempt()) {
                         LOG.error("Exhausted max retries to produce {} to  DLQ 
topic node {} due to client response error {}.",
                             param, dlqPartitionLeaderNode(), 
clientResponseErrorMessage);
+                        
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
                         requestErrorResponse(clientResponseError.exception());
                         break;
                     }
@@ -617,6 +649,7 @@ public class ShareGroupDLQStateManager {
                 default:
                     LOG.error("Unable to produce {} to DLQ topic node {} due 
to client response error {}.",
                         param, dlqPartitionLeaderNode(), 
clientResponseErrorMessage);
+                    shareGroupMetrics.recordDLQProduceFailed(param.groupId());
                     requestErrorResponse(clientResponseError.exception());
             }
         }
@@ -771,13 +804,19 @@ public class ShareGroupDLQStateManager {
         }
     }
 
-    private record CoalesceResults(
+    // Visibility for tests
+    record CoalesceResults(
         AbstractRequest.Builder<? extends AbstractRequest> request,
         List<ProduceRequestHandler> liveHandlers
     ) {
     }
 
-    private static CoalesceResults 
coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
+    // Visibility for tests
+    static CoalesceResults coalesceProduceRequests(List<ProduceRequestHandler> 
handlers) {
+        // Above handlers are destined for the same broker node - it could be 
for different DLQ topics and partitions
+        // but the same broker node. Now the produce request requires each 
topic data request to be
+        // scoped to a specific topic/topicId and the partition data could 
have all the record information
+        // and the destination DLQ partition. To accomplish this, we will map 
handlers by DLQ topic id.
         Map<Uuid, ProduceRequestData.TopicProduceData> produceHandlerMap = new 
HashMap<>();
         List<ProduceRequestHandler> liveHandlers = new 
ArrayList<>(handlers.size());
         handlers.forEach(handler -> {
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
index a6d939003ce..eac929140cd 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
@@ -37,24 +37,31 @@ import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper.TopicPartitionData;
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.server.util.timer.SystemTimer;
 import org.apache.kafka.server.util.timer.SystemTimerReaper;
 import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -73,7 +80,15 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
 class ShareGroupDLQStateManagerTest {
@@ -87,6 +102,7 @@ class ShareGroupDLQStateManagerTest {
     private static final Node DEFAULT_LEADER = new Node(0, HOST, PORT);
 
     private final MockTimer mockTimer = new MockTimer(MOCK_TIME);
+    private final ShareGroupMetrics mockMetrics = 
mock(ShareGroupMetrics.class);
     private ShareGroupDLQStateManager stateManager;
 
     @AfterEach
@@ -101,6 +117,7 @@ class ShareGroupDLQStateManagerTest {
         private Time time = MOCK_TIME;
         private Timer timer;
         private ShareGroupDLQMetadataCacheHelper cacheHelper;
+        private ShareGroupMetrics shareGroupMetrics;
 
         Builder withClient(KafkaClient client) {
             this.client = client;
@@ -122,12 +139,20 @@ class ShareGroupDLQStateManagerTest {
             return this;
         }
 
+        Builder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) {
+            this.shareGroupMetrics = shareGroupMetrics;
+            return this;
+        }
+
         ShareGroupDLQStateManager build() {
+            // Default to the test-class mockMetrics field so tests can verify 
interactions
+            // without having to thread a custom metrics mock through the 
builder.
             return new ShareGroupDLQStateManager(
                 client != null ? client : new MockClient(MOCK_TIME),
                 cacheHelper != null ? cacheHelper : 
happyCacheHelper(DEFAULT_LEADER),
                 time,
-                timer != null ? timer : mockTimer
+                timer != null ? timer : mockTimer,
+                shareGroupMetrics != null ? shareGroupMetrics : mockMetrics
             );
         }
     }
@@ -233,14 +258,14 @@ class ShareGroupDLQStateManagerTest {
     public void testConstructorRejectsNullClient() {
         ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
         assertThrows(IllegalArgumentException.class,
-            () -> new ShareGroupDLQStateManager(null, cacheHelper, MOCK_TIME, 
mockTimer));
+            () -> new ShareGroupDLQStateManager(null, cacheHelper, MOCK_TIME, 
mockTimer, mockMetrics));
     }
 
     @Test
     public void testConstructorRejectsNullCacheHelper() {
         KafkaClient client = mock(KafkaClient.class);
         assertThrows(IllegalArgumentException.class,
-            () -> new ShareGroupDLQStateManager(client, null, MOCK_TIME, 
mockTimer));
+            () -> new ShareGroupDLQStateManager(client, null, MOCK_TIME, 
mockTimer, mockMetrics));
     }
 
     @Test
@@ -248,7 +273,7 @@ class ShareGroupDLQStateManagerTest {
         KafkaClient client = mock(KafkaClient.class);
         ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
         assertThrows(IllegalArgumentException.class,
-            () -> new ShareGroupDLQStateManager(client, cacheHelper, null, 
mockTimer));
+            () -> new ShareGroupDLQStateManager(client, cacheHelper, null, 
mockTimer, mockMetrics));
     }
 
     @Test
@@ -256,7 +281,15 @@ class ShareGroupDLQStateManagerTest {
         KafkaClient client = mock(KafkaClient.class);
         ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
         assertThrows(IllegalArgumentException.class,
-            () -> new ShareGroupDLQStateManager(client, cacheHelper, 
MOCK_TIME, null));
+            () -> new ShareGroupDLQStateManager(client, cacheHelper, 
MOCK_TIME, null, mockMetrics));
+    }
+
+    @Test
+    public void testConstructorRejectsNullShareGroupMetrics() {
+        KafkaClient client = mock(KafkaClient.class);
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        assertThrows(IllegalArgumentException.class,
+            () -> new ShareGroupDLQStateManager(client, cacheHelper, 
MOCK_TIME, mockTimer, null));
     }
 
     // ---- Lifecycle tests ----
@@ -268,12 +301,14 @@ class ShareGroupDLQStateManagerTest {
         stateManager.start();
         stateManager.start();
         // tearDown will call stateManager.stop() and must not throw.
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
     public void testStopWithoutStartIsNoOp() {
         stateManager = builder().build();
         // tearDown will call stateManager.stop() without a prior start() and 
must not throw.
+        verifyNoInteractions(mockMetrics);
     }
 
     // ---- DLQ topic validation tests (no thread start required) ----
@@ -288,6 +323,7 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertInstanceOf(ConfigException.class, cause);
         assertTrue(cause.getMessage().contains("empty"));
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -300,6 +336,7 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertInstanceOf(ConfigException.class, cause);
         assertTrue(cause.getMessage().contains("__"));
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -314,6 +351,7 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertInstanceOf(ConfigException.class, cause);
         assertTrue(cause.getMessage().contains("DLQ is not enabled"));
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -328,6 +366,7 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertInstanceOf(ConfigException.class, cause);
         assertTrue(cause.getMessage().contains("auto create is disabled"));
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -342,6 +381,7 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertInstanceOf(ConfigException.class, cause);
         assertTrue(cause.getMessage().contains("does not comply with the DLQ 
topic prefix"));
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -356,6 +396,7 @@ class ShareGroupDLQStateManagerTest {
         assertTrue(result.isDone());
         assertTrue(result.isCompletedExceptionally());
         assertFalse(result.isCancelled());
+        verifyNoInteractions(mockMetrics);
     }
 
     // ---- Full integration tests ----
@@ -390,6 +431,9 @@ class ShareGroupDLQStateManagerTest {
                 HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
             ))
         ));
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
     }
 
     @Test
@@ -440,6 +484,9 @@ class ShareGroupDLQStateManagerTest {
                 HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
             ))
         ));
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
     }
 
     @Test
@@ -495,6 +542,9 @@ class ShareGroupDLQStateManagerTest {
                 HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
             ))
         ));
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
     }
 
     @Test
@@ -521,6 +571,8 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertNotNull(cause);
         assertEquals(Errors.INVALID_REPLICATION_FACTOR.exception().getClass(), 
cause.getClass());
+        // CreateTopics failed; produce was never attempted, so no DLQ metrics 
should fire.
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -537,6 +589,8 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertNotNull(cause);
         assertEquals(Errors.BROKER_NOT_AVAILABLE.exception().getClass(), 
cause.getClass());
+        // No cluster node was available to send CreateTopics to; produce 
never attempted.
+        verifyNoInteractions(mockMetrics);
     }
 
     @Test
@@ -574,6 +628,8 @@ class ShareGroupDLQStateManagerTest {
             Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L, 
maxAttempts));
             assertNotNull(cause);
             assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(), 
cause.getClass());
+            // CreateTopics retries exhausted; produce never attempted, so no 
DLQ metrics should fire.
+            verifyNoInteractions(mockMetrics);
         } finally {
             Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
         }
@@ -641,6 +697,10 @@ class ShareGroupDLQStateManagerTest {
                     HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
                 ))
             ));
+            // CreateTopics retried, but produce only ran once (after the 
eventual create success).
+            verify(mockMetrics).recordDLQProduce(GROUP_ID);
+            verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+            verify(mockMetrics, never()).recordDLQProduceFailed(any());
         } finally {
             Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
         }
@@ -688,6 +748,11 @@ class ShareGroupDLQStateManagerTest {
             for (ProduceRequest pr : capturedProduces) {
                 assertDlqProduceRecordHeaders(pr, expectedByPartition);
             }
+            // Each attempt (including retries) goes through generateRequests, 
so recordDLQProduce
+            // is invoked once per attempt. recordDLQRecordWrite fires only on 
the final success.
+            verify(mockMetrics, times(maxAttempts)).recordDLQProduce(GROUP_ID);
+            verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+            verify(mockMetrics, never()).recordDLQProduceFailed(any());
         } finally {
             Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
         }
@@ -707,6 +772,9 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertNotNull(cause);
         assertEquals(Errors.INVALID_TOPIC_EXCEPTION.exception().getClass(), 
cause.getClass());
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics).recordDLQProduceFailed(GROUP_ID);
+        verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
     }
 
     @Test
@@ -723,6 +791,11 @@ class ShareGroupDLQStateManagerTest {
         Throwable cause = getCause(stateManager.dlq(param()));
         assertNotNull(cause);
         assertEquals(Errors.UNKNOWN_SERVER_ERROR.exception().getClass(), 
cause.getClass());
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+        // Empty produce-response paths return UNKNOWN_SERVER_ERROR via 
requestErrorResponse without
+        // touching recordDLQProduceFailed (that's only invoked from the 
inner/outer default cases).
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
+        verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
     }
 
     @Test
@@ -751,6 +824,11 @@ class ShareGroupDLQStateManagerTest {
         } catch (TimeoutException expected) {
             assertFalse(result.isDone());
         }
+        // The first attempt did go out to the wire, so recordDLQProduce fired 
once. The retry is
+        // still pending in the timer, so neither success nor failure metrics 
should have landed.
+        verify(mockMetrics, times(1)).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
     }
 
     @Test
@@ -781,6 +859,53 @@ class ShareGroupDLQStateManagerTest {
             Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L, 
maxAttempts));
             assertNotNull(cause);
             assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(), 
cause.getClass());
+            // NETWORK_EXCEPTION exhaustion now records the failure metric 
(production records it
+            // on the canAttempt()==false branch before requestErrorResponse). 
Only the final
+            // exhaustion records the failure - the earlier retried attempts 
don't.
+            verify(mockMetrics, times(maxAttempts)).recordDLQProduce(GROUP_ID);
+            verify(mockMetrics).recordDLQProduceFailed(GROUP_ID);
+            verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
+        } finally {
+            Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+        }
+    }
+
+    @Test
+    public void testDlqProduceNotLeaderOrFollowerExhaustsAndRecordsFailure() 
throws Exception {
+        int maxAttempts = 3;
+        // Real timer with tiny backoffs keeps the exhaustion path within 
milliseconds.
+        Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+            new SystemTimer("shareGroupDLQTestTimer"));
+        try {
+            MockClient client = new MockClient(MOCK_TIME);
+            // Each attempt returns a partition-level NOT_LEADER_OR_FOLLOWER 
on partition 0,
+            // which is retriable up to maxAttempts.
+            for (int i = 0; i < maxAttempts; i++) {
+                client.prepareResponseFrom(
+                    body -> body instanceof ProduceRequest,
+                    produceResponseWithError(Errors.NOT_LEADER_OR_FOLLOWER),
+                    DEFAULT_LEADER
+                );
+            }
+
+            stateManager = builder()
+                .withClient(client)
+                .withTimer(realTimer)
+                .build();
+            stateManager.start();
+
+            Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L, 
maxAttempts));
+            assertNotNull(cause);
+            // The exhaustion path raises a generic Exception (not 
Errors.NOT_LEADER_OR_FOLLOWER's
+            // typed exception), so just check the message rather than the 
class.
+            assertTrue(cause.getMessage().contains("Exhausted max retries"));
+
+            // Each attempt is its own outgoing produce request -> 
recordDLQProduce fires
+            // maxAttempts times. The final attempt exhausts retries and 
records the failure
+            // (the new inner NOT_LEADER_OR_FOLLOWER exhaustion branch).
+            verify(mockMetrics, times(maxAttempts)).recordDLQProduce(GROUP_ID);
+            verify(mockMetrics).recordDLQProduceFailed(GROUP_ID);
+            verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
         } finally {
             Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
         }
@@ -887,6 +1012,13 @@ class ShareGroupDLQStateManagerTest {
                 assertDlqProduceRecordHeaders(pr, Map.of(dlqPartitionIndex, 
expected));
             }
         }
+        // Each handler succeeds with 1 record (offsets 0..0). 
recordDLQProduce is now invoked
+        // once per handler inside topicProduceData() (not deduped 
per-group-per-request as it
+        // was previously in coalesceProduceRequests), so two handlers always 
produce two calls
+        // regardless of whether the SendThread coalesces them into a single 
produce request.
+        verify(mockMetrics, times(2)).recordDLQRecordWrite(GROUP_ID, 1);
+        verify(mockMetrics, times(2)).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
     }
 
     @Test
@@ -925,8 +1057,358 @@ class ShareGroupDLQStateManagerTest {
                 HEADER_DLQ_ERRORS_GROUP, GROUP_ID
             ))
         ));
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+        verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 1);
+        verify(mockMetrics, never()).recordDLQProduceFailed(any());
     }
 
+    @Test
+    public void testDlqMultipleGroupsWithMixedOutcomes() throws Exception {
+        String groupA = "group-a";
+        String groupB = "group-b";
+        String groupC = "group-c";
+
+        // All three groups share the same DLQ topic with 3 partitions on the 
same leader, so
+        // partition 0 -> group-a, partition 1 -> group-b, partition 2 -> 
group-c (via the
+        // sourcePartition % numPartitions mapping in populateDLQTopicData).
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(anyString())).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+        
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+        when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(3),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(DEFAULT_LEADER, DEFAULT_LEADER, DEFAULT_LEADER)
+        ));
+
+        // Each prepared produce response carries partition 0=NONE, 
1=INVALID_TOPIC_EXCEPTION,
+        // 2=NONE so that whichever physical request lands at the broker - one 
fully coalesced
+        // request, two requests, or three separate requests, depending on 
SendThread/inFlight
+        // timing - each handler still finds its own destination partition in 
the response.
+        ProduceResponseData.TopicProduceResponse topicResp = new 
ProduceResponseData.TopicProduceResponse()
+            .setTopicId(DLQ_TOPIC_ID)
+            .setPartitionResponses(List.of(
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(0)
+                    .setErrorCode(Errors.NONE.code()),
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(1)
+                    .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code())
+                    .setErrorMessage(Errors.INVALID_TOPIC_EXCEPTION.message()),
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(2)
+                    .setErrorCode(Errors.NONE.code())
+            ));
+        ProduceResponseData.TopicProduceResponseCollection collection =
+            new ProduceResponseData.TopicProduceResponseCollection();
+        collection.add(topicResp);
+        ProduceResponse mixedResp = new ProduceResponse(new 
ProduceResponseData().setResponses(collection));
+
+        MockClient client = new MockClient(MOCK_TIME);
+        // Over-prepare: there may be 1, 2, or 3 outgoing produce requests 
depending on how the
+        // SendThread coalesces. Unused prepared responses are harmless.
+        for (int i = 0; i < 3; i++) {
+            client.prepareResponseFrom(body -> body instanceof ProduceRequest, 
mixedResp, DEFAULT_LEADER);
+        }
+
+        stateManager = 
builder().withClient(client).withCacheHelper(cacheHelper).build();
+        stateManager.start();
+
+        ShareGroupDLQRecordParameter pA = new ShareGroupDLQRecordParameter(
+            groupA,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+        ShareGroupDLQRecordParameter pB = new ShareGroupDLQRecordParameter(
+            groupB,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+        ShareGroupDLQRecordParameter pC = new ShareGroupDLQRecordParameter(
+            groupC,
+            new TopicIdPartition(SOURCE_TOPIC_ID, 2, "source-topic"),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+
+        CompletableFuture<Void> rA = stateManager.dlq(pA);
+        CompletableFuture<Void> rB = stateManager.dlq(pB);
+        CompletableFuture<Void> rC = stateManager.dlq(pC);
+
+        assertNull(rA.get(5, TimeUnit.SECONDS));
+        Throwable causeB = getCause(rB);
+        assertEquals(Errors.INVALID_TOPIC_EXCEPTION.exception().getClass(), 
causeB.getClass());
+        assertNull(rC.get(5, TimeUnit.SECONDS));
+
+        // recordDLQProduce is invoked once per handler inside 
topicProduceData(). Each group
+        // has a single handler that runs through exactly one attempt (no 
retries on this path),
+        // so each group sees exactly one metric call regardless of how the 
SendThread coalesces.
+        verify(mockMetrics).recordDLQProduce(groupA);
+        verify(mockMetrics).recordDLQProduce(groupB);
+        verify(mockMetrics).recordDLQProduce(groupC);
+
+        // Per-group write count records only for the successful groups.
+        verify(mockMetrics).recordDLQRecordWrite(groupA, 1);
+        verify(mockMetrics).recordDLQRecordWrite(groupC, 1);
+        verify(mockMetrics, never()).recordDLQRecordWrite(eq(groupB), 
anyInt());
+
+        // Failure metric fires only for groupB (the INVALID_TOPIC_EXCEPTION 
partition). Production
+        // now records the metric before completing the future, so no 
timeout-bridge is needed.
+        verify(mockMetrics).recordDLQProduceFailed(groupB);
+        verify(mockMetrics, never()).recordDLQProduceFailed(groupA);
+        verify(mockMetrics, never()).recordDLQProduceFailed(groupC);
+
+        // Aggregate sanity check: total recordDLQProduce calls (3) strictly 
exceeds total
+        // recordDLQProduceFailed calls (1), demonstrating that "some failed 
and failed < total".
+        long produceCount = 
Mockito.mockingDetails(mockMetrics).getInvocations().stream()
+            .filter(inv -> 
inv.getMethod().getName().equals("recordDLQProduce"))
+            .count();
+        long produceFailedCount = 
Mockito.mockingDetails(mockMetrics).getInvocations().stream()
+            .filter(inv -> 
inv.getMethod().getName().equals("recordDLQProduceFailed"))
+            .count();
+        assertEquals(3, produceCount);
+        assertEquals(1, produceFailedCount);
+        assertTrue(produceFailedCount < produceCount,
+            "Expected recordDLQProduceFailed count (" + produceFailedCount + 
") < recordDLQProduce count (" + produceCount + ")");
+    }
+
+    @Test
+    public void testMultipleAccumulatedHandlersInNodeRPCMap() throws Exception 
{
+        MockClient client = new MockClient(MOCK_TIME);
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+
+        client.prepareResponseFrom(body -> true, null, DEFAULT_LEADER);
+
+        stateManager = 
builder().withClient(client).withCacheHelper(happyCacheHelper(DEFAULT_LEADER)).build();
+
+        Future<Boolean> done = executor.submit(() -> {
+            long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start <= 
TestUtils.DEFAULT_MAX_WAIT_MS) {    // keep checking for a few secs
+                List<ShareGroupDLQStateManager.ProduceRequestHandler> handlers 
= stateManager.nodeRPCMap().get(DEFAULT_LEADER);
+                if (handlers != null && handlers.size() > 2) {
+                    return true;
+                }
+            }
+            return false;
+        });
+
+        stateManager.start();
+
+        // Multiple dlq() calls for the same group. They all target the same 
leader (DEFAULT_LEADER),
+        // so after the first iteration marks that node as in-flight, the rest 
accumulate in
+        // nodeRPCMap and never get cleared.
+        for (int i = 0; i < 10; i++) {
+            stateManager.dlq(new ShareGroupDLQRecordParameter(
+                GROUP_ID,
+                new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+                0L, 0L,
+                Optional.empty(), Optional.empty(), false));
+        }
+
+        // Wait until the callback observes nodeRPCMap with more than 2 
handlers piled up.
+        TestUtils.waitForCondition(done::get, TestUtils.DEFAULT_MAX_WAIT_MS, 
10L, () -> {
+            executor.shutdown();
+            return "unable to verify batching";
+        });
+        executor.shutdown();
+    }
+
+    // ---- Direct unit tests for coalesceProduceRequests ----
+
+    @Test
+    public void 
testCoalesceProduceRequestsWithEmptyHandlerListProducesEmptyRequest() {
+        ShareGroupDLQStateManager.CoalesceResults result =
+            ShareGroupDLQStateManager.coalesceProduceRequests(List.of());
+
+        assertTrue(result.liveHandlers().isEmpty());
+        ProduceRequest request = ((ProduceRequest.Builder) 
result.request()).build();
+        assertTrue(request.data().topicData().isEmpty());
+    }
+
+    @Test
+    public void 
testCoalesceProduceRequestsWithSingleHandlerProducesOneTopicOnePartition() 
throws Exception {
+        stateManager = builder().build();
+        ShareGroupDLQStateManager.ProduceRequestHandler handler =
+            newHandlerForCoalesceTest(stateManager, GROUP_ID, 0);
+        handler.populateDLQTopicData();
+
+        ShareGroupDLQStateManager.CoalesceResults result =
+            
ShareGroupDLQStateManager.coalesceProduceRequests(List.of(handler));
+
+        assertEquals(List.of(handler), result.liveHandlers());
+
+        ProduceRequest request = ((ProduceRequest.Builder) 
result.request()).build();
+        assertEquals(1, request.data().topicData().size());
+        ProduceRequestData.TopicProduceData topic = 
request.data().topicData().iterator().next();
+        assertEquals(DLQ_TOPIC_ID, topic.topicId());
+        assertEquals(1, topic.partitionData().size());
+        assertEquals(0, topic.partitionData().get(0).index());
+
+        // topicProduceData() fires recordDLQProduce once per handler as a 
side effect.
+        verify(mockMetrics).recordDLQProduce(GROUP_ID);
+    }
+
+    @Test
+    public void testCoalesceProduceRequestsMergesPartitionsForSameDlqTopic() 
throws Exception {
+        // Cache helper exposes a single DLQ topic with 2 partitions on the 
same leader so that
+        // two handlers (source partitions 0 and 1) map to DLQ partitions 0 
and 1 respectively.
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(anyString())).thenReturn(Optional.of(DLQ_TOPIC));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new 
TopicPartitionData(
+            DLQ_TOPIC,
+            Optional.of(2),
+            Optional.of(DLQ_TOPIC_ID),
+            List.of(DEFAULT_LEADER, DEFAULT_LEADER)
+        ));
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        ShareGroupDLQStateManager.ProduceRequestHandler h0 =
+            newHandlerForCoalesceTest(stateManager, GROUP_ID, 0);
+        ShareGroupDLQStateManager.ProduceRequestHandler h1 =
+            newHandlerForCoalesceTest(stateManager, GROUP_ID, 1);
+        h0.populateDLQTopicData();
+        h1.populateDLQTopicData();
+
+        ShareGroupDLQStateManager.CoalesceResults result =
+            ShareGroupDLQStateManager.coalesceProduceRequests(List.of(h0, h1));
+
+        assertEquals(List.of(h0, h1), result.liveHandlers());
+
+        ProduceRequest request = ((ProduceRequest.Builder) 
result.request()).build();
+        assertEquals(1, request.data().topicData().size(),
+            "Both handlers share a DLQ topic so they should coalesce into a 
single topic entry");
+        ProduceRequestData.TopicProduceData topic = 
request.data().topicData().iterator().next();
+        assertEquals(DLQ_TOPIC_ID, topic.topicId());
+        Set<Integer> partitionIndices = topic.partitionData().stream()
+            .map(ProduceRequestData.PartitionProduceData::index)
+            .collect(Collectors.toSet());
+        assertEquals(Set.of(0, 1), partitionIndices);
+
+        // recordDLQProduce fires once per handler (the metric lives in 
topicProduceData()).
+        verify(mockMetrics, times(2)).recordDLQProduce(GROUP_ID);
+    }
+
+    @Test
+    public void testCoalesceProduceRequestsKeepsDifferentDlqTopicsSeparate() 
throws Exception {
+        String groupA = "group-a";
+        String groupB = "group-b";
+        String dlqTopicA = "dlq-topic-a";
+        String dlqTopicB = "dlq-topic-b";
+        Uuid dlqTopicAId = Uuid.randomUuid();
+        Uuid dlqTopicBId = Uuid.randomUuid();
+
+        ShareGroupDLQMetadataCacheHelper cacheHelper = 
mock(ShareGroupDLQMetadataCacheHelper.class);
+        
when(cacheHelper.shareGroupDlqTopic(groupA)).thenReturn(Optional.of(dlqTopicA));
+        
when(cacheHelper.shareGroupDlqTopic(groupB)).thenReturn(Optional.of(dlqTopicB));
+        
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+        when(cacheHelper.containsTopic(dlqTopicA)).thenReturn(true);
+        when(cacheHelper.containsTopic(dlqTopicB)).thenReturn(true);
+        when(cacheHelper.isDlqEnabledOnTopic(anyString())).thenReturn(true);
+        when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+        when(cacheHelper.topicPartitionData(dlqTopicA)).thenReturn(new 
TopicPartitionData(
+            dlqTopicA, Optional.of(1), Optional.of(dlqTopicAId), 
List.of(DEFAULT_LEADER)));
+        when(cacheHelper.topicPartitionData(dlqTopicB)).thenReturn(new 
TopicPartitionData(
+            dlqTopicB, Optional.of(1), Optional.of(dlqTopicBId), 
List.of(DEFAULT_LEADER)));
+
+        stateManager = builder().withCacheHelper(cacheHelper).build();
+        ShareGroupDLQStateManager.ProduceRequestHandler hA =
+            newHandlerForCoalesceTest(stateManager, groupA, 0);
+        ShareGroupDLQStateManager.ProduceRequestHandler hB =
+            newHandlerForCoalesceTest(stateManager, groupB, 0);
+        hA.populateDLQTopicData();
+        hB.populateDLQTopicData();
+
+        ShareGroupDLQStateManager.CoalesceResults result =
+            ShareGroupDLQStateManager.coalesceProduceRequests(List.of(hA, hB));
+
+        assertEquals(List.of(hA, hB), result.liveHandlers());
+
+        ProduceRequest request = ((ProduceRequest.Builder) 
result.request()).build();
+        assertEquals(2, request.data().topicData().size(),
+            "Different DLQ topic ids must remain in separate TopicProduceData 
entries");
+        Set<Uuid> topicIds = new HashSet<>();
+        request.data().topicData().forEach(topic -> 
topicIds.add(topic.topicId()));
+        assertEquals(Set.of(dlqTopicAId, dlqTopicBId), topicIds);
+
+        verify(mockMetrics).recordDLQProduce(groupA);
+        verify(mockMetrics).recordDLQProduce(groupB);
+    }
+
+    @Test
+    public void 
testCoalesceProduceRequestsSkipsHandlerWhoseTopicProduceDataThrows() throws 
Exception {
+        stateManager = builder().build();
+
+        // Good handler: populateDLQTopicData() has been called, so 
topicProduceData() succeeds.
+        CompletableFuture<Void> goodFuture = new CompletableFuture<>();
+        ShareGroupDLQStateManager.ProduceRequestHandler good = 
stateManager.new ProduceRequestHandler(
+            new ShareGroupDLQRecordParameter(GROUP_ID,
+                new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+                0L, 0L,
+                Optional.empty(), Optional.empty(), false),
+            goodFuture,
+            ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
+            ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
+            3);
+        good.populateDLQTopicData();
+
+        // Broken handler: populateDLQTopicData() was never called, so 
dlqTopicPartitionData is
+        // null and topicProduceData() will NPE. coalesceProduceRequests must 
catch, call
+        // requestErrorResponse to fail the future, and drop the handler from 
liveHandlers.
+        CompletableFuture<Void> brokenFuture = new CompletableFuture<>();
+        ShareGroupDLQStateManager.ProduceRequestHandler broken = 
stateManager.new ProduceRequestHandler(
+            new ShareGroupDLQRecordParameter(GROUP_ID,
+                new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+                0L, 0L,
+                Optional.empty(), Optional.empty(), false),
+            brokenFuture,
+            ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
+            ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
+            3);
+
+        ShareGroupDLQStateManager.CoalesceResults result =
+            ShareGroupDLQStateManager.coalesceProduceRequests(List.of(good, 
broken));
+
+        assertEquals(List.of(good), result.liveHandlers(),
+            "Only the handler whose topicProduceData() succeeded should appear 
in liveHandlers");
+        assertFalse(goodFuture.isDone(), "Good handler's future must be 
untouched by coalesce");
+        assertTrue(brokenFuture.isCompletedExceptionally(),
+            "Broken handler's future must be completed exceptionally");
+        Throwable cause = getCause(brokenFuture);
+        assertInstanceOf(NullPointerException.class, cause);
+
+        // The resulting request still contains the surviving handler's data.
+        ProduceRequest request = ((ProduceRequest.Builder) 
result.request()).build();
+        assertEquals(1, request.data().topicData().size());
+        assertEquals(DLQ_TOPIC_ID, 
request.data().topicData().iterator().next().topicId());
+    }
+
+    private static ShareGroupDLQStateManager.ProduceRequestHandler 
newHandlerForCoalesceTest(
+        ShareGroupDLQStateManager manager,
+        String groupId,
+        int sourcePartition
+    ) {
+        ShareGroupDLQRecordParameter param = new ShareGroupDLQRecordParameter(
+            groupId,
+            new TopicIdPartition(SOURCE_TOPIC_ID, sourcePartition, 
"source-topic"),
+            0L, 0L,
+            Optional.empty(), Optional.empty(), false);
+        return manager.new ProduceRequestHandler(
+            param,
+            new CompletableFuture<>(),
+            ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
+            ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
+            3);
+    }
+
+
     // ---- Response builder helpers ----
 
     private static ProduceResponse successfulProduceResponse(int partition) {

Reply via email to