This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37d0f70e056 MINOR: Use DLQ topic for share group id to enable DLQ. 
(#22436)
37d0f70e056 is described below

commit 37d0f70e056ad51f78995dde706506b3324c789c
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Jun 2 18:33:22 2026 +0530

    MINOR: Use DLQ topic for share group id to enable DLQ. (#22436)
    
    * Currently, the code in SharePartition only looks at the broker feature
    config `share.version=2` to enable DLQ flow. However, this in not
    correct.
    * Per KIP-1191
    > A share group can be configured with the name of a topic to be used as
    the group's DLQ topic. For a share group with a DLQ topic, when a
    record's delivery is rejected by the consumer ...
    * This implies that gating along with DLQ topic being configured on the
    share group is ESSENTIAL for DLQ flows to start.
    * In light of this - appropriate changes have been made in
    SharePartition where the configProvider is leveraged to fetch the DLQ
    topic config applied to the share group. In case the topic name is
    empty, DLQ flow does not execute.
    * Tests have been added to reflect the same and existing tests updated
    with mock DLQ manager to assert on the enqueue calls.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  22 +--
 .../kafka/server/share/SharePartitionTest.java     | 176 +++++++++++++++++++++
 .../modern/share/ShareGroupConfigProvider.java     |  15 ++
 .../modern/share/ShareGroupConfigProviderTest.java |  23 +++
 4 files changed, 227 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 4232eaa4336..c8b4decd0e1 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -946,7 +946,7 @@ public class SharePartition {
                     continue;
                 }
 
-                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, 
shareGroupDlqEnableSupplier.get());
+                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, 
isDLQEnabledForGroup());
                 if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.info("Unable to acquire records for the batch: {} in 
share partition: {}-{}",
                         inFlightBatch, groupId, topicIdPartition);
@@ -1138,7 +1138,7 @@ public class SharePartition {
                         DeliveryCountOps.NO_OP,
                         this.maxDeliveryCount(),
                         EMPTY_MEMBER_ID,
-                        shareGroupDlqEnableSupplier.get()
+                        isDLQEnabledForGroup()
                 );
                 if (updateResult == null) {
                     log.debug("Unable to release records from acquired state 
for the offset: {} in batch: {}"
@@ -1182,7 +1182,7 @@ public class SharePartition {
                     DeliveryCountOps.NO_OP,
                     this.maxDeliveryCount(),
                     EMPTY_MEMBER_ID,
-                    shareGroupDlqEnableSupplier.get()
+                    isDLQEnabledForGroup()
             );
             if (updateResult == null) {
                 log.debug("Unable to release records from acquired state for 
the batch: {}"
@@ -2004,7 +2004,7 @@ public class SharePartition {
                 }
 
                 InFlightState updateResult = 
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
-                    maxDeliveryCount, memberId, 
shareGroupDlqEnableSupplier.get());
+                    maxDeliveryCount, memberId, isDLQEnabledForGroup());
                 if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
                             + " for the share partition: {}-{}", 
offsetState.getKey(), inFlightBatch,
@@ -2358,7 +2358,7 @@ public class SharePartition {
                         DeliveryCountOps.NO_OP,
                         this.maxDeliveryCount(),
                         EMPTY_MEMBER_ID,
-                        shareGroupDlqEnableSupplier.get()
+                        isDLQEnabledForGroup()
                     );
 
                     if (updateResult == null) {
@@ -2446,7 +2446,7 @@ public class SharePartition {
                 DeliveryCountOps.NO_OP,
                 this.maxDeliveryCount(),
                 EMPTY_MEMBER_ID,
-                shareGroupDlqEnableSupplier.get()
+                isDLQEnabledForGroup()
             );
             if (updateResult == null) {
                 log.debug("Unable to acknowledge records for the batch: {} 
with state: {}"
@@ -3023,7 +3023,7 @@ public class SharePartition {
                     DeliveryCountOps.NO_OP,
                     maxDeliveryCount(),
                     EMPTY_MEMBER_ID,
-                    shareGroupDlqEnableSupplier.get());
+                    isDLQEnabledForGroup());
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the batch: {}"
                         + " for the share partition: {}-{} memberId: {}", 
inFlightBatch, groupId, topicIdPartition, memberId);
@@ -3089,7 +3089,7 @@ public class SharePartition {
                     DeliveryCountOps.NO_OP,
                     maxDeliveryCount(),
                     EMPTY_MEMBER_ID,
-                    shareGroupDlqEnableSupplier.get());
+                    isDLQEnabledForGroup());
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the offset: {} in batch: {}"
                                 + " for the share partition: {}-{} memberId: 
{}", offsetState.getKey(), inFlightBatch,
@@ -3328,12 +3328,16 @@ public class SharePartition {
     }
 
     private RecordState recordStateWithDlq(byte ackType) {
-        if (shareGroupDlqEnableSupplier.get() && AcknowledgeType.REJECT.id == 
ackType) {
+        if (isDLQEnabledForGroup() && AcknowledgeType.REJECT.id == ackType) {
             return RecordState.ARCHIVING;
         }
         return ACK_TYPE_TO_RECORD_STATE.get(ackType);
     }
 
+    private boolean isDLQEnabledForGroup() {
+        return shareGroupDlqEnableSupplier.get() && 
configProvider.errorsDLQTopicName(groupId).isPresent();
+    }
+
     // Visible for testing.
 
     /**
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 4d7be150afa..45d07400923 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -59,6 +59,7 @@ import 
org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
@@ -12621,9 +12622,12 @@ public class SharePartitionTest {
 
     @Test
     public void testAcknowledgeRejectWithDlqEnabled() {
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire 2 batches so that the first one stays in cache after being 
archived.
@@ -12655,13 +12659,18 @@ public class SharePartitionTest {
 
         // deliveryCompleteCount is 0 as evicted records are subtracted.
         assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // The rejected batch (offsets 5-9) is enqueued to the DLQ exactly 
once.
+        Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcknowledgeRejectWithDlqDisabled() {
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> false)
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         MemoryRecords records1 = memoryRecords(5, 5);
@@ -12685,13 +12694,99 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
         assertEquals(10, sharePartition.startOffset());
         assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // DLQ is disabled, so the DLQ is never invoked.
+        Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
+    }
+
+    @Test
+    public void 
testAcknowledgeRejectWithDlqSupplierEnabledButNoDlqTopicConfigured() {
+        // The DLQ supplier returns true, but the group has no DLQ topic 
configured (the default
+        // ShareGroupConfigProvider returns an empty topic name). DLQ must 
therefore be treated as
+        // disabled: REJECT goes directly to ARCHIVED (no ARCHIVING 
intermediate state) and the DLQ
+        // is never invoked.
+        ShareGroupDLQManager dlqManager = mockDlqManager();
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .withShareGroupDlqManager(dlqManager)
+            .build();
+
+        // Acquire 2 batches so that the first one stays in cache after being 
archived.
+        MemoryRecords records1 = memoryRecords(5, 5);
+        MemoryRecords records2 = memoryRecords(10, 5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 5);
+        assertEquals(1, acquiredRecordsList.size());
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 
5);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Acknowledge the first batch with REJECT.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 9, 
List.of(AcknowledgeType.REJECT.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        // Without an effective DLQ, REJECT goes directly to ARCHIVED and the 
batch at start offset
+        // is evicted from cache.
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNull(sharePartition.cachedState().get(5L));
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // The DLQ must not be invoked since no DLQ topic is configured for 
the group.
+        Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
+    }
+
+    @Test
+    public void 
testReleaseAcquiredRecordsMaxDeliveryWithDlqSupplierEnabledButNoDlqTopicConfigured()
 {
+        // The DLQ supplier returns true, but the group has no DLQ topic 
configured (the default
+        // ShareGroupConfigProvider returns an empty topic name). DLQ must 
therefore be treated as
+        // disabled: when the delivery count reaches the max on release, 
records go directly to
+        // ARCHIVED (no ARCHIVING intermediate state) and the DLQ is never 
invoked.
+        ShareGroupDLQManager dlqManager = mockDlqManager();
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made 
before archiving the records.
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .withShareGroupDlqManager(dlqManager)
+            .build();
+
+        // Leading batch (offsets 0-9) stays acquired so the archived batch 
remains in cache.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+
+        MemoryRecords records2 = memoryRecords(10, 5);
+        // First delivery attempt for offsets 10-14, delivery count becomes 1.
+        fetchAcquiredRecords(sharePartition, records2, 5);
+        // Release them back to AVAILABLE.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(10, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+        // Second delivery attempt, delivery count reaches the max (2).
+        fetchAcquiredRecords(sharePartition, records2, 5);
+
+        // Release again. Delivery count has reached the max, so the records 
are archived.
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
+        assertFalse(releaseResult.isCompletedExceptionally());
+
+        // Without an effective DLQ, the records go directly to ARCHIVED (not 
ARCHIVING).
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(10L).batchState());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+
+        // The DLQ must not be invoked since no DLQ topic is configured for 
the group.
+        Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcknowledgePerOffsetRejectWithDlqEnabled() {
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire a batch with 5 records (offsets 0-4) and a second batch to 
keep cache populated.
@@ -12719,13 +12814,18 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
         assertEquals(5, sharePartition.startOffset());
         assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // Offsets 3 and 4 are rejected per-offset, so each is enqueued to the 
DLQ separately.
+        Mockito.verify(dlqManager, Mockito.times(2)).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcknowledgePerOffsetRejectWithDlqDisabled() {
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> false)
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         MemoryRecords records1 = memoryRecords(5);
@@ -12751,15 +12851,21 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
         assertEquals(5, sharePartition.startOffset());
         assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // DLQ is disabled, so the DLQ is never invoked.
+        Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcquisitionLockTimeoutWithDlqEnabledCompleteBatch() throws 
InterruptedException {
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire two batches so the first stays in cache after being 
archived.
@@ -12804,15 +12910,21 @@ public class SharePartitionTest {
         assertEquals(0, sharePartition.deliveryCompleteCount());
         // Second batch remains AVAILABLE.
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+
+        // The archived complete batch (offsets 0-9) is enqueued to the DLQ 
exactly once.
+        Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcquisitionLockTimeoutWithDlqEnabledPerOffsetBatch() 
throws InterruptedException {
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire a batch of 10 records (offsets 0-9).
@@ -12867,16 +12979,21 @@ public class SharePartitionTest {
         // Offsets 0-4 are ARCHIVED, 5-9 are AVAILABLE. Next fetch offset 
moves to 5
         // since offsets 0-4 are no longer fetchable.
         assertEquals(5, sharePartition.nextFetchOffset());
+
+        // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ 
separately.
+        Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcquisitionLockTimeoutWithDlqDisabledCompleteBatch() 
throws InterruptedException {
         // Verify that without DLQ, max delivery count still causes ARCHIVED 
(not ARCHIVING).
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> false)
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire two batches.
@@ -12906,16 +13023,22 @@ public class SharePartitionTest {
         // Batch evicted, start offset advances.
         assertEquals(10, sharePartition.startOffset());
         assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // DLQ is disabled, so the DLQ is never invoked.
+        Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
     }
 
     @Test
     public void testAcquisitionLockTimeoutWithDlqEnabledMixedOffsets() throws 
InterruptedException {
         // Test where some offsets in a batch exceed max delivery count and 
some don't.
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire batch of 10 records (offsets 0-9).
@@ -12954,12 +13077,16 @@ public class SharePartitionTest {
             DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
             () -> "Batch at offset 0 was not evicted after DLQ archival. Timer 
size: " +
                 sharePartition.timer().size() + ", cachedState keys: " + 
sharePartition.cachedState().keySet());
+
+        // Offsets 5-9 are archived per-offset, so each is enqueued to the DLQ 
separately.
+        Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
     }
 
     @Test
     public void 
testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() throws 
InterruptedException {
         // Phase 1 persist of ARCHIVING fails, but phase 2 still proceeds 
unconditionally
         // because timeout path uses tryUpdateState (no rollback).
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
 
@@ -12976,6 +13103,8 @@ public class SharePartitionTest {
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire two batches.
@@ -13013,11 +13142,15 @@ public class SharePartitionTest {
         assertEquals(0, sharePartition.deliveryCompleteCount());
         // Second batch went to AVAILABLE (delivery count 1 < max).
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+
+        // The archived complete batch (offsets 0-9) is enqueued to the DLQ 
exactly once.
+        Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
     }
 
     @Test
     public void 
testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() throws 
InterruptedException {
         // Phase 1 persist of ARCHIVING fails for per-offset batch, but phase 
2 still proceeds.
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
 
@@ -13034,6 +13167,8 @@ public class SharePartitionTest {
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire batch of 10 records (offsets 0-9).
@@ -13082,12 +13217,16 @@ public class SharePartitionTest {
         // Despite both persists failing, offsets 0-4 reached ARCHIVED in 
memory (no rollback for timeout).
         // Offsets 5-9 remain AVAILABLE.
         assertEquals(5, sharePartition.nextFetchOffset());
+
+        // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ 
separately.
+        Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
     }
 
     @Test
     public void 
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBatch() 
throws InterruptedException {
         // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED) 
succeeds.
         // Records should reach ARCHIVED despite phase 1 failure.
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
 
@@ -13112,6 +13251,8 @@ public class SharePartitionTest {
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire two batches.
@@ -13152,11 +13293,15 @@ public class SharePartitionTest {
         assertEquals(10, sharePartition.startOffset());
         assertEquals(0, sharePartition.deliveryCompleteCount());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+
+        // The archived complete batch (offsets 0-9) is enqueued to the DLQ 
exactly once.
+        Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
     }
 
     @Test
     public void 
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetBatch() 
throws InterruptedException {
         // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED) 
succeeds for per-offset batch.
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
 
@@ -13181,6 +13326,8 @@ public class SharePartitionTest {
             .withMaxDeliveryCount(2)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withConfigProvider(configProviderWithDlqTopic())
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         // Acquire batch of 10 records (offsets 0-9).
@@ -13233,6 +13380,9 @@ public class SharePartitionTest {
         // Phase 1 failed but phase 2 succeeded — offsets 0-4 reached ARCHIVED.
         // Offsets 5-9 remain AVAILABLE.
         assertEquals(5, sharePartition.nextFetchOffset());
+
+        // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ 
separately.
+        Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
     }
 
     // Unit tests for processDlqPhase2 method directly.
@@ -13264,10 +13414,12 @@ public class SharePartitionTest {
                 PartitionFactory.newPartitionErrorData(0, error.code(), 
error.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeResult));
 
+        ShareGroupDLQManager dlqManager = mockDlqManager();
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withState(SharePartitionState.ACTIVE)
             .withShareGroupDlqEnableSupplier(() -> true)
+            .withShareGroupDlqManager(dlqManager)
             .build();
 
         InFlightState state = new InFlightState(RecordState.ARCHIVING, 
deliveryCount, EMPTY_MEMBER_ID);
@@ -13277,6 +13429,16 @@ public class SharePartitionTest {
         assertEquals(expectedState, state.state());
         assertFalse(state.hasOngoingStateTransition());
 
+        // Verify the records were enqueued to the DLQ exactly once with the 
expected parameters.
+        ArgumentCaptor<ShareGroupDLQRecordParameter> dlqCaptor =
+            ArgumentCaptor.forClass(ShareGroupDLQRecordParameter.class);
+        Mockito.verify(dlqManager, 
Mockito.times(1)).enqueue(dlqCaptor.capture());
+        ShareGroupDLQRecordParameter dlqParam = dlqCaptor.getValue();
+        assertEquals(firstOffset, dlqParam.firstOffset());
+        assertEquals(lastOffset, dlqParam.lastOffset());
+        assertEquals(Optional.of(deliveryCount), dlqParam.deliveryCount());
+        assertEquals(Optional.ofNullable(dlqCause), dlqParam.cause());
+
         // Verify persister.writeState was called exactly once with the 
correct state batch.
         ArgumentCaptor<WriteShareGroupStateParameters> captor =
             ArgumentCaptor.forClass(WriteShareGroupStateParameters.class);
@@ -13302,6 +13464,20 @@ public class SharePartitionTest {
         Mockito.verify(persister, Mockito.never()).readState(Mockito.any());
     }
 
+    private static ShareGroupDLQManager mockDlqManager() {
+        ShareGroupDLQManager dlqManager = 
Mockito.mock(ShareGroupDLQManager.class);
+        
Mockito.when(dlqManager.enqueue(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null));
+        return dlqManager;
+    }
+
+    private static ShareGroupConfigProvider configProviderWithDlqTopic() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+        
Mockito.when(groupConfig.errorsDLQTopicName()).thenReturn("test-dlq-topic");
+        return new ShareGroupConfigProvider(groupConfigManager);
+    }
+
     private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
         ShareGroupConfigProvider configProvider = 
Mockito.mock(ShareGroupConfigProvider.class);
         
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
index 868236f9c63..ef00633dbb7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -20,6 +20,8 @@ import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 
+import java.util.Optional;
+
 /**
  * A provider that retrieves share group dynamic configuration values,
  * falling back to default values when group-specific configurations are not 
present.
@@ -98,4 +100,17 @@ public class ShareGroupConfigProvider {
             .flatMap(GroupConfig::shareAutoOffsetReset)
             .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
     }
+
+    /**
+     * The method is used to get the name of the configured DLQ topic on the 
share group. If the group config
+     * is present, then the value from the group config is used. Otherwise, 
empty optional is returned.
+     *
+     * @param groupId The group id for which the DLQ topic name is to be 
fetched.
+     * @return Optional representing DLQ topic name for the share group, empty 
if not found.
+     */
+    public Optional<String> errorsDLQTopicName(String groupId) {
+        return manager.groupConfig(groupId)
+            .map(GroupConfig::errorsDLQTopicName)
+            .filter(val -> !val.isEmpty());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
index a426b51554b..ba4905f63f3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -132,4 +132,27 @@ public class ShareGroupConfigProviderTest {
 
         assertEquals(GroupConfig.defaultShareAutoOffsetReset(), 
provider.autoOffsetReset("test-group"));
     }
+
+    @Test
+    void testShareGroupDLQTopicWithGroupConfig() {
+        String shareGroupDLQTopicName = "dlq.testGroupDLQTopic";
+        String shareGroupId = "test-group";
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        GroupConfig groupConfig = mock(GroupConfig.class);
+        
when(groupConfig.errorsDLQTopicName()).thenReturn(shareGroupDLQTopicName);
+        
when(groupConfigManager.groupConfig(shareGroupId)).thenReturn(Optional.of(groupConfig));
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(Optional.of(shareGroupDLQTopicName), 
provider.errorsDLQTopicName(shareGroupId));
+    }
+
+    @Test
+    void testShareGroupDLQTopicWithoutGroupConfig() {
+        String shareGroupId = "test-group";
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(shareGroupId)).thenReturn(Optional.empty());
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(Optional.empty(), 
provider.errorsDLQTopicName(shareGroupId));
+    }
 }

Reply via email to