This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 37d0f70e056 MINOR: Use DLQ topic for share group id to enable DLQ.
(#22436)
37d0f70e056 is described below
commit 37d0f70e056ad51f78995dde706506b3324c789c
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Jun 2 18:33:22 2026 +0530
MINOR: Use DLQ topic for share group id to enable DLQ. (#22436)
* Currently, the code in SharePartition only looks at the broker feature
config `share.version=2` to enable DLQ flow. However, this in not
correct.
* Per KIP-1191
> A share group can be configured with the name of a topic to be used as
the group's DLQ topic. For a share group with a DLQ topic, when a
record's delivery is rejected by the consumer ...
* This implies that gating along with DLQ topic being configured on the
share group is ESSENTIAL for DLQ flows to start.
* In light of this - appropriate changes have been made in
SharePartition where the configProvider is leveraged to fetch the DLQ
topic config applied to the share group. In case the topic name is
empty, DLQ flow does not execute.
* Tests have been added to reflect the same and existing tests updated
with mock DLQ manager to assert on the enqueue calls.
Reviewers: Apoorv Mittal <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 22 +--
.../kafka/server/share/SharePartitionTest.java | 176 +++++++++++++++++++++
.../modern/share/ShareGroupConfigProvider.java | 15 ++
.../modern/share/ShareGroupConfigProviderTest.java | 23 +++
4 files changed, 227 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 4232eaa4336..c8b4decd0e1 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -946,7 +946,7 @@ public class SharePartition {
continue;
}
- InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId,
shareGroupDlqEnableSupplier.get());
+ InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId,
isDLQEnabledForGroup());
if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.info("Unable to acquire records for the batch: {} in
share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
@@ -1138,7 +1138,7 @@ public class SharePartition {
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
- shareGroupDlqEnableSupplier.get()
+ isDLQEnabledForGroup()
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state
for the offset: {} in batch: {}"
@@ -1182,7 +1182,7 @@ public class SharePartition {
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
- shareGroupDlqEnableSupplier.get()
+ isDLQEnabledForGroup()
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state for
the batch: {}"
@@ -2004,7 +2004,7 @@ public class SharePartition {
}
InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
- maxDeliveryCount, memberId,
shareGroupDlqEnableSupplier.get());
+ maxDeliveryCount, memberId, isDLQEnabledForGroup());
if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in
batch: {}"
+ " for the share partition: {}-{}",
offsetState.getKey(), inFlightBatch,
@@ -2358,7 +2358,7 @@ public class SharePartition {
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
- shareGroupDlqEnableSupplier.get()
+ isDLQEnabledForGroup()
);
if (updateResult == null) {
@@ -2446,7 +2446,7 @@ public class SharePartition {
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
- shareGroupDlqEnableSupplier.get()
+ isDLQEnabledForGroup()
);
if (updateResult == null) {
log.debug("Unable to acknowledge records for the batch: {}
with state: {}"
@@ -3023,7 +3023,7 @@ public class SharePartition {
DeliveryCountOps.NO_OP,
maxDeliveryCount(),
EMPTY_MEMBER_ID,
- shareGroupDlqEnableSupplier.get());
+ isDLQEnabledForGroup());
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for
the batch: {}"
+ " for the share partition: {}-{} memberId: {}",
inFlightBatch, groupId, topicIdPartition, memberId);
@@ -3089,7 +3089,7 @@ public class SharePartition {
DeliveryCountOps.NO_OP,
maxDeliveryCount(),
EMPTY_MEMBER_ID,
- shareGroupDlqEnableSupplier.get());
+ isDLQEnabledForGroup());
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for
the offset: {} in batch: {}"
+ " for the share partition: {}-{} memberId:
{}", offsetState.getKey(), inFlightBatch,
@@ -3328,12 +3328,16 @@ public class SharePartition {
}
private RecordState recordStateWithDlq(byte ackType) {
- if (shareGroupDlqEnableSupplier.get() && AcknowledgeType.REJECT.id ==
ackType) {
+ if (isDLQEnabledForGroup() && AcknowledgeType.REJECT.id == ackType) {
return RecordState.ARCHIVING;
}
return ACK_TYPE_TO_RECORD_STATE.get(ackType);
}
+ private boolean isDLQEnabledForGroup() {
+ return shareGroupDlqEnableSupplier.get() &&
configProvider.errorsDLQTopicName(groupId).isPresent();
+ }
+
// Visible for testing.
/**
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 4d7be150afa..45d07400923 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -59,6 +59,7 @@ import
org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
@@ -12621,9 +12622,12 @@ public class SharePartitionTest {
@Test
public void testAcknowledgeRejectWithDlqEnabled() {
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire 2 batches so that the first one stays in cache after being
archived.
@@ -12655,13 +12659,18 @@ public class SharePartitionTest {
// deliveryCompleteCount is 0 as evicted records are subtracted.
assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // The rejected batch (offsets 5-9) is enqueued to the DLQ exactly
once.
+ Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
}
@Test
public void testAcknowledgeRejectWithDlqDisabled() {
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> false)
+ .withShareGroupDlqManager(dlqManager)
.build();
MemoryRecords records1 = memoryRecords(5, 5);
@@ -12685,13 +12694,99 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
assertEquals(10, sharePartition.startOffset());
assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // DLQ is disabled, so the DLQ is never invoked.
+ Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
+ }
+
+ @Test
+ public void
testAcknowledgeRejectWithDlqSupplierEnabledButNoDlqTopicConfigured() {
+ // The DLQ supplier returns true, but the group has no DLQ topic
configured (the default
+ // ShareGroupConfigProvider returns an empty topic name). DLQ must
therefore be treated as
+ // disabled: REJECT goes directly to ARCHIVED (no ARCHIVING
intermediate state) and the DLQ
+ // is never invoked.
+ ShareGroupDLQManager dlqManager = mockDlqManager();
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .withShareGroupDlqManager(dlqManager)
+ .build();
+
+ // Acquire 2 batches so that the first one stays in cache after being
archived.
+ MemoryRecords records1 = memoryRecords(5, 5);
+ MemoryRecords records2 = memoryRecords(10, 5);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 5);
+ assertEquals(1, acquiredRecordsList.size());
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2,
5);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Acknowledge the first batch with REJECT.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.REJECT.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ // Without an effective DLQ, REJECT goes directly to ARCHIVED and the
batch at start offset
+ // is evicted from cache.
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNull(sharePartition.cachedState().get(5L));
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // The DLQ must not be invoked since no DLQ topic is configured for
the group.
+ Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
+ }
+
+ @Test
+ public void
testReleaseAcquiredRecordsMaxDeliveryWithDlqSupplierEnabledButNoDlqTopicConfigured()
{
+ // The DLQ supplier returns true, but the group has no DLQ topic
configured (the default
+ // ShareGroupConfigProvider returns an empty topic name). DLQ must
therefore be treated as
+ // disabled: when the delivery count reaches the max on release,
records go directly to
+ // ARCHIVED (no ARCHIVING intermediate state) and the DLQ is never
invoked.
+ ShareGroupDLQManager dlqManager = mockDlqManager();
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made
before archiving the records.
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .withShareGroupDlqManager(dlqManager)
+ .build();
+
+ // Leading batch (offsets 0-9) stays acquired so the archived batch
remains in cache.
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+
+ MemoryRecords records2 = memoryRecords(10, 5);
+ // First delivery attempt for offsets 10-14, delivery count becomes 1.
+ fetchAcquiredRecords(sharePartition, records2, 5);
+ // Release them back to AVAILABLE.
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.RELEASE.id))));
+ // Second delivery attempt, delivery count reaches the max (2).
+ fetchAcquiredRecords(sharePartition, records2, 5);
+
+ // Release again. Delivery count has reached the max, so the records
are archived.
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
+ assertFalse(releaseResult.isCompletedExceptionally());
+
+ // Without an effective DLQ, the records go directly to ARCHIVED (not
ARCHIVING).
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(10L).batchState());
+ assertNull(sharePartition.cachedState().get(10L).offsetState());
+
+ // The DLQ must not be invoked since no DLQ topic is configured for
the group.
+ Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
}
@Test
public void testAcknowledgePerOffsetRejectWithDlqEnabled() {
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire a batch with 5 records (offsets 0-4) and a second batch to
keep cache populated.
@@ -12719,13 +12814,18 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
assertEquals(5, sharePartition.startOffset());
assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // Offsets 3 and 4 are rejected per-offset, so each is enqueued to the
DLQ separately.
+ Mockito.verify(dlqManager, Mockito.times(2)).enqueue(Mockito.any());
}
@Test
public void testAcknowledgePerOffsetRejectWithDlqDisabled() {
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> false)
+ .withShareGroupDlqManager(dlqManager)
.build();
MemoryRecords records1 = memoryRecords(5);
@@ -12751,15 +12851,21 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
assertEquals(5, sharePartition.startOffset());
assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // DLQ is disabled, so the DLQ is never invoked.
+ Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
}
@Test
public void testAcquisitionLockTimeoutWithDlqEnabledCompleteBatch() throws
InterruptedException {
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire two batches so the first stays in cache after being
archived.
@@ -12804,15 +12910,21 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.deliveryCompleteCount());
// Second batch remains AVAILABLE.
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+
+ // The archived complete batch (offsets 0-9) is enqueued to the DLQ
exactly once.
+ Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
}
@Test
public void testAcquisitionLockTimeoutWithDlqEnabledPerOffsetBatch()
throws InterruptedException {
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire a batch of 10 records (offsets 0-9).
@@ -12867,16 +12979,21 @@ public class SharePartitionTest {
// Offsets 0-4 are ARCHIVED, 5-9 are AVAILABLE. Next fetch offset
moves to 5
// since offsets 0-4 are no longer fetchable.
assertEquals(5, sharePartition.nextFetchOffset());
+
+ // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ
separately.
+ Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
}
@Test
public void testAcquisitionLockTimeoutWithDlqDisabledCompleteBatch()
throws InterruptedException {
// Verify that without DLQ, max delivery count still causes ARCHIVED
(not ARCHIVING).
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> false)
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire two batches.
@@ -12906,16 +13023,22 @@ public class SharePartitionTest {
// Batch evicted, start offset advances.
assertEquals(10, sharePartition.startOffset());
assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // DLQ is disabled, so the DLQ is never invoked.
+ Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any());
}
@Test
public void testAcquisitionLockTimeoutWithDlqEnabledMixedOffsets() throws
InterruptedException {
// Test where some offsets in a batch exceed max delivery count and
some don't.
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire batch of 10 records (offsets 0-9).
@@ -12954,12 +13077,16 @@ public class SharePartitionTest {
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> "Batch at offset 0 was not evicted after DLQ archival. Timer
size: " +
sharePartition.timer().size() + ", cachedState keys: " +
sharePartition.cachedState().keySet());
+
+ // Offsets 5-9 are archived per-offset, so each is enqueued to the DLQ
separately.
+ Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
}
@Test
public void
testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() throws
InterruptedException {
// Phase 1 persist of ARCHIVING fails, but phase 2 still proceeds
unconditionally
// because timeout path uses tryUpdateState (no rollback).
+ ShareGroupDLQManager dlqManager = mockDlqManager();
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
@@ -12976,6 +13103,8 @@ public class SharePartitionTest {
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire two batches.
@@ -13013,11 +13142,15 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.deliveryCompleteCount());
// Second batch went to AVAILABLE (delivery count 1 < max).
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+
+ // The archived complete batch (offsets 0-9) is enqueued to the DLQ
exactly once.
+ Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
}
@Test
public void
testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() throws
InterruptedException {
// Phase 1 persist of ARCHIVING fails for per-offset batch, but phase
2 still proceeds.
+ ShareGroupDLQManager dlqManager = mockDlqManager();
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
@@ -13034,6 +13167,8 @@ public class SharePartitionTest {
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire batch of 10 records (offsets 0-9).
@@ -13082,12 +13217,16 @@ public class SharePartitionTest {
// Despite both persists failing, offsets 0-4 reached ARCHIVED in
memory (no rollback for timeout).
// Offsets 5-9 remain AVAILABLE.
assertEquals(5, sharePartition.nextFetchOffset());
+
+ // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ
separately.
+ Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
}
@Test
public void
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBatch()
throws InterruptedException {
// Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED)
succeeds.
// Records should reach ARCHIVED despite phase 1 failure.
+ ShareGroupDLQManager dlqManager = mockDlqManager();
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
@@ -13112,6 +13251,8 @@ public class SharePartitionTest {
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire two batches.
@@ -13152,11 +13293,15 @@ public class SharePartitionTest {
assertEquals(10, sharePartition.startOffset());
assertEquals(0, sharePartition.deliveryCompleteCount());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+
+ // The archived complete batch (offsets 0-9) is enqueued to the DLQ
exactly once.
+ Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any());
}
@Test
public void
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetBatch()
throws InterruptedException {
// Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED)
succeeds for per-offset batch.
+ ShareGroupDLQManager dlqManager = mockDlqManager();
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
@@ -13181,6 +13326,8 @@ public class SharePartitionTest {
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withConfigProvider(configProviderWithDlqTopic())
+ .withShareGroupDlqManager(dlqManager)
.build();
// Acquire batch of 10 records (offsets 0-9).
@@ -13233,6 +13380,9 @@ public class SharePartitionTest {
// Phase 1 failed but phase 2 succeeded — offsets 0-4 reached ARCHIVED.
// Offsets 5-9 remain AVAILABLE.
assertEquals(5, sharePartition.nextFetchOffset());
+
+ // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ
separately.
+ Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any());
}
// Unit tests for processDlqPhase2 method directly.
@@ -13264,10 +13414,12 @@ public class SharePartitionTest {
PartitionFactory.newPartitionErrorData(0, error.code(),
error.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeResult));
+ ShareGroupDLQManager dlqManager = mockDlqManager();
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withState(SharePartitionState.ACTIVE)
.withShareGroupDlqEnableSupplier(() -> true)
+ .withShareGroupDlqManager(dlqManager)
.build();
InFlightState state = new InFlightState(RecordState.ARCHIVING,
deliveryCount, EMPTY_MEMBER_ID);
@@ -13277,6 +13429,16 @@ public class SharePartitionTest {
assertEquals(expectedState, state.state());
assertFalse(state.hasOngoingStateTransition());
+ // Verify the records were enqueued to the DLQ exactly once with the
expected parameters.
+ ArgumentCaptor<ShareGroupDLQRecordParameter> dlqCaptor =
+ ArgumentCaptor.forClass(ShareGroupDLQRecordParameter.class);
+ Mockito.verify(dlqManager,
Mockito.times(1)).enqueue(dlqCaptor.capture());
+ ShareGroupDLQRecordParameter dlqParam = dlqCaptor.getValue();
+ assertEquals(firstOffset, dlqParam.firstOffset());
+ assertEquals(lastOffset, dlqParam.lastOffset());
+ assertEquals(Optional.of(deliveryCount), dlqParam.deliveryCount());
+ assertEquals(Optional.ofNullable(dlqCause), dlqParam.cause());
+
// Verify persister.writeState was called exactly once with the
correct state batch.
ArgumentCaptor<WriteShareGroupStateParameters> captor =
ArgumentCaptor.forClass(WriteShareGroupStateParameters.class);
@@ -13302,6 +13464,20 @@ public class SharePartitionTest {
Mockito.verify(persister, Mockito.never()).readState(Mockito.any());
}
+ private static ShareGroupDLQManager mockDlqManager() {
+ ShareGroupDLQManager dlqManager =
Mockito.mock(ShareGroupDLQManager.class);
+
Mockito.when(dlqManager.enqueue(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null));
+ return dlqManager;
+ }
+
+ private static ShareGroupConfigProvider configProviderWithDlqTopic() {
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
Mockito.when(groupConfig.errorsDLQTopicName()).thenReturn("test-dlq-topic");
+ return new ShareGroupConfigProvider(groupConfigManager);
+ }
+
private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
ShareGroupConfigProvider configProvider =
Mockito.mock(ShareGroupConfigProvider.class);
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
index 868236f9c63..ef00633dbb7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -20,6 +20,8 @@ import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+import java.util.Optional;
+
/**
* A provider that retrieves share group dynamic configuration values,
* falling back to default values when group-specific configurations are not
present.
@@ -98,4 +100,17 @@ public class ShareGroupConfigProvider {
.flatMap(GroupConfig::shareAutoOffsetReset)
.orElseGet(GroupConfig::defaultShareAutoOffsetReset);
}
+
+ /**
+ * The method is used to get the name of the configured DLQ topic on the
share group. If the group config
+ * is present, then the value from the group config is used. Otherwise,
empty optional is returned.
+ *
+ * @param groupId The group id for which the DLQ topic name is to be
fetched.
+ * @return Optional representing DLQ topic name for the share group, empty
if not found.
+ */
+ public Optional<String> errorsDLQTopicName(String groupId) {
+ return manager.groupConfig(groupId)
+ .map(GroupConfig::errorsDLQTopicName)
+ .filter(val -> !val.isEmpty());
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
index a426b51554b..ba4905f63f3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -132,4 +132,27 @@ public class ShareGroupConfigProviderTest {
assertEquals(GroupConfig.defaultShareAutoOffsetReset(),
provider.autoOffsetReset("test-group"));
}
+
+ @Test
+ void testShareGroupDLQTopicWithGroupConfig() {
+ String shareGroupDLQTopicName = "dlq.testGroupDLQTopic";
+ String shareGroupId = "test-group";
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+ GroupConfig groupConfig = mock(GroupConfig.class);
+
when(groupConfig.errorsDLQTopicName()).thenReturn(shareGroupDLQTopicName);
+
when(groupConfigManager.groupConfig(shareGroupId)).thenReturn(Optional.of(groupConfig));
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(Optional.of(shareGroupDLQTopicName),
provider.errorsDLQTopicName(shareGroupId));
+ }
+
+ @Test
+ void testShareGroupDLQTopicWithoutGroupConfig() {
+ String shareGroupId = "test-group";
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig(shareGroupId)).thenReturn(Optional.empty());
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(Optional.empty(),
provider.errorsDLQTopicName(shareGroupId));
+ }
}