This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdc9eb2c3a7 KAFKA-20245: DLQ records exceeding max delivery count.
[3/N] (#22080)
cdc9eb2c3a7 is described below
commit cdc9eb2c3a723df0e0730c1a29efab39ab63e03f
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed May 6 15:46:02 2026 +0530
KAFKA-20245: DLQ records exceeding max delivery count. [3/N] (#22080)
* Changes to support DLQ of offsets and batches which have exceeded max
delivery count.
* The Phase 2 part of the transition to DLQ (->ARCHIVED (in mem and
persist)) has been refactored into a separate method.
* The current code does not transactionally handle state transitions on
max delivery count exceeded. ARCHIVING follows a similar approach.
* The acquisition lock timeout handler has been leveraged to invoke DLQ
logic for cases where `InflightState.tryUpdateState` deems that the
record should be in ARCHIVING state.
Reviewers: Apoorv Mittal <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 238 +++++----
.../kafka/server/share/SharePartitionTest.java | 560 ++++++++++++++++++++-
.../server/share/dlq/NoOpShareGroupDLQManager.java | 2 +-
.../kafka/server/share/fetch/InFlightBatch.java | 10 +-
.../kafka/server/share/fetch/InFlightState.java | 8 +-
5 files changed, 713 insertions(+), 105 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 7e86a4477a6..2fbca75a72a 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -938,7 +938,7 @@ public class SharePartition {
continue;
}
- InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId);
+ InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId,
shareGroupDlqEnableSupplier.get());
if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.info("Unable to acquire records for the batch: {} in
share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
@@ -1129,7 +1129,8 @@ public class SharePartition {
offsetState.getKey() < startOffset ?
RecordState.ARCHIVED : recordState,
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
- EMPTY_MEMBER_ID
+ EMPTY_MEMBER_ID,
+ shareGroupDlqEnableSupplier.get()
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state
for the offset: {} in batch: {}"
@@ -1139,8 +1140,9 @@ public class SharePartition {
}
// Successfully updated the state of the offset and created a
persister state batch for write to persister.
+ Throwable dlqCause = updateResult.state() ==
RecordState.ARCHIVING ? ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED : null;
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(offsetState.getKey(),
- offsetState.getKey(), updateResult.state().id(), (short)
updateResult.deliveryCount()), null));
+ offsetState.getKey(), updateResult.state().id(), (short)
updateResult.deliveryCount()), dlqCause));
if (offsetState.getKey() >= startOffset &&
isStateTerminal(updateResult.state())) {
deliveryCompleteCount.incrementAndGet();
}
@@ -1171,7 +1173,8 @@ public class SharePartition {
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : recordState,
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
- EMPTY_MEMBER_ID
+ EMPTY_MEMBER_ID,
+ shareGroupDlqEnableSupplier.get()
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state for
the batch: {}"
@@ -1180,8 +1183,10 @@ public class SharePartition {
}
// Successfully updated the state of the batch and created a
persister state batch for write to persister.
+ // If DLQ support is enabled, then update the DLQ cause exception
message.
+ Throwable dlqCause = updateResult.state() == RecordState.ARCHIVING
? ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED : null;
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(inFlightBatch.firstOffset(),
- inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount()), null));
+ inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount()), dlqCause));
if (isStateTerminal(updateResult.state())) {
deliveryCompleteCount.addAndGet(numInFlightRecordsInBatch(inFlightBatch.firstOffset(),
inFlightBatch.lastOffset()));
}
@@ -1991,7 +1996,7 @@ public class SharePartition {
}
InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
- maxDeliveryCount, memberId);
+ maxDeliveryCount, memberId,
shareGroupDlqEnableSupplier.get());
if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in
batch: {}"
+ " for the share partition: {}-{}",
offsetState.getKey(), inFlightBatch,
@@ -2344,7 +2349,8 @@ public class SharePartition {
recordState,
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
- EMPTY_MEMBER_ID
+ EMPTY_MEMBER_ID,
+ shareGroupDlqEnableSupplier.get()
);
if (updateResult == null) {
@@ -2355,6 +2361,12 @@ public class SharePartition {
"Unable to acknowledge records for the batch"));
}
+ // This check makes sure that we don't skip the cause if
updated result
+ // results in ARCHIVING due to max delivery count exceeded.
+ if (dlqCause == null && updateResult.state() ==
RecordState.ARCHIVING) {
+ dlqCause = ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED;
+ }
+
// Successfully updated the state of the offset and
created a persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(), updateResult.state().id(),
(short) updateResult.deliveryCount()), dlqCause));
@@ -2425,7 +2437,8 @@ public class SharePartition {
recordState,
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
- EMPTY_MEMBER_ID
+ EMPTY_MEMBER_ID,
+ shareGroupDlqEnableSupplier.get()
);
if (updateResult == null) {
log.debug("Unable to acknowledge records for the batch: {}
with state: {}"
@@ -2435,6 +2448,12 @@ public class SharePartition {
new InvalidRecordStateException("Unable to acknowledge
records for the batch"));
}
+ // This check makes sure that we don't skip the cause if updated
result
+ // results in ARCHIVING due to max delivery count exceeded.
+ if (dlqCause == null && updateResult.state() ==
RecordState.ARCHIVING) {
+ dlqCause = ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED;
+ }
+
// Successfully updated the state of the batch and created a
persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(inFlightBatch.firstOffset(),
inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount()), dlqCause));
@@ -2521,6 +2540,7 @@ public class SharePartition {
// on the startOffset to move ahead, hence track if the state
is updated in the cache. If
// yes, then notify the delayed share fetch purgatory to
complete the pending requests.
boolean cacheStateUpdated = false;
+ List<PersisterBatch> dlqBatches = new
ArrayList<>(persisterBatches.size());
lock.writeLock().lock();
try {
if (exception != null) {
@@ -2547,95 +2567,16 @@ public class SharePartition {
log.trace("State change request successful for share
partition: {}-{}",
groupId, topicIdPartition);
- List<PersisterBatch> nonDlqBatches = new
ArrayList<>(persisterBatches.size());
- List<PersisterBatch> dlqBatches = new
ArrayList<>(persisterBatches.size());
for (PersisterBatch persisterBatch : persisterBatches) {
+
persisterBatch.updatedState().completeStateTransition(true);
+ if (persisterBatch.updatedState.state() ==
RecordState.AVAILABLE) {
+ updateFindNextFetchOffset(true);
+ }
if (persisterBatch.updatedState.state() ==
RecordState.ARCHIVING) {
dlqBatches.add(persisterBatch);
- } else {
- nonDlqBatches.add(persisterBatch);
}
}
- nonDlqBatches.forEach(persisterBatch -> {
-
persisterBatch.updatedState.completeStateTransition(true);
- if (persisterBatch.updatedState.state() ==
RecordState.AVAILABLE) {
- updateFindNextFetchOffset(true);
- }
- });
-
- dlqBatches.forEach(persisterBatch -> {
-
persisterBatch.updatedState.completeStateTransition(true);
- shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
- groupId,
- topicIdPartition,
- persisterBatch.stateBatch.firstOffset(),
- persisterBatch.stateBatch.lastOffset(),
-
Optional.of(persisterBatch.stateBatch.deliveryCount()),
- Optional.ofNullable(persisterBatch.dlqCause),
- false
- )).whenComplete((v1, dlqException) -> {
- PersisterStateBatch sb = persisterBatch.stateBatch;
- if (dlqException != null) {
- log.error("Failed to write to DLQ for share
partition: {}-{}, offsets {}-{}. "
- + "Proceeding to ARCHIVED state
regardless.",
- groupId, topicIdPartition,
sb.firstOffset(), sb.lastOffset(), dlqException);
- }
-
- PersisterBatch phase2Batch;
- lock.writeLock().lock();
- try {
- // dlqBatch.updatedState() is the same
InFlightState object in the cache,
- // now committed in ARCHIVING. Transition it
directly.
- InFlightState updateResult =
persisterBatch.updatedState().startStateTransition(
- RecordState.ARCHIVED,
- DeliveryCountOps.NO_OP,
- maxDeliveryCount(),
- EMPTY_MEMBER_ID
- );
- if (updateResult == null) {
- log.error("Unable to transition ARCHIVING
→ ARCHIVED for offsets {}-{} "
- + "in share partition: {}-{}",
sb.firstOffset(), sb.lastOffset(),
- groupId, topicIdPartition);
- return;
- }
- phase2Batch = new PersisterBatch(updateResult,
new PersisterStateBatch(
- sb.firstOffset(), sb.lastOffset(),
- updateResult.state().id(), (short)
updateResult.deliveryCount()), null);
- deliveryCompleteCount.addAndGet(
-
numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
- } finally {
- lock.writeLock().unlock();
- }
-
- // Second persist: ARCHIVING → ARCHIVED
-
writeShareGroupState(List.of(phase2Batch.stateBatch()))
- .whenComplete((v2, phase2Exception) -> {
- boolean phase2CacheUpdated = false;
- lock.writeLock().lock();
- try {
- if (phase2Exception != null) {
- log.error("Failed to persist
ARCHIVED state for DLQ phase 2, "
- + "share partition: {}-{}.
Records remain in ARCHIVING.",
- groupId, topicIdPartition,
phase2Exception);
-
phase2Batch.updatedState().completeStateTransition(false);
- if
(isStateTerminal(RecordState.forId(phase2Batch.stateBatch().deliveryState()))
- &&
!isStateTerminal(phase2Batch.updatedState().state())) {
-
deliveryCompleteCount.addAndGet(
-
-numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
- }
- return;
- }
-
-
phase2Batch.updatedState().completeStateTransition(true);
- phase2CacheUpdated =
maybeUpdateCachedStateAndOffsets();
- } finally {
- lock.writeLock().unlock();
-
maybeCompleteDelayedShareFetchRequest(phase2CacheUpdated);
- }
- });
- });
- });
// Update the cached state and start and end offsets after
acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
} finally {
@@ -2650,6 +2591,17 @@ public class SharePartition {
// request can be completed. The call should be made
outside the lock to avoid deadlock.
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
}
+
+ // Persister batch state has been moved to ARCHIVING, we must
now start the DLQ flow and transition to ARCHIVED.
+ dlqBatches.forEach(persisterBatch -> {
+ initiateDLQAndArchive(
+ persisterBatch.updatedState,
+ persisterBatch.stateBatch.firstOffset(),
+ persisterBatch.stateBatch.lastOffset(),
+ persisterBatch.stateBatch.deliveryCount(),
+ persisterBatch.dlqCause
+ );
+ });
});
}
@@ -2982,6 +2934,8 @@ public class SharePartition {
private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
return (memberId, firstOffset, lastOffset, timerTask) -> {
List<PersisterStateBatch> stateBatches;
+ List<DlqBatch> dlqBatches;
+
lock.writeLock().lock();
try {
// Check if timer task is already cancelled. This can happen
when concurrent requests
@@ -2998,6 +2952,7 @@ public class SharePartition {
return;
}
stateBatches = new ArrayList<>();
+ dlqBatches = new ArrayList<>();
NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet())
{
InFlightBatch inFlightBatch = entry.getValue();
@@ -3013,9 +2968,9 @@ public class SharePartition {
// Case when the state of complete batch is valid
if (inFlightBatch.offsetState() == null) {
-
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches,
memberId);
+
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches,
dlqBatches, memberId);
} else { // Case when batch has a valid offset state map.
-
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches,
memberId, firstOffset, lastOffset);
+
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches,
dlqBatches, memberId, firstOffset, lastOffset);
}
}
} finally {
@@ -3031,6 +2986,15 @@ public class SharePartition {
// Even if write share group state RPC call fails, we will
still go ahead with the state transition.
// Update the cached state and start and end offsets after
releasing the acquisition lock on timeout.
maybeUpdateCachedStateAndOffsets();
+
+ // Persister batch state has been moved to ARCHIVING, we
must now start the DLQ flow and transition to ARCHIVED.
+ dlqBatches.forEach(dlqBatch -> initiateDLQAndArchive(
+ dlqBatch.updatedState(),
+ dlqBatch.firstOffset(),
+ dlqBatch.lastOffset(),
+ dlqBatch.deliveryCount(),
+ ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED
+ ));
});
}
@@ -3043,13 +3007,15 @@ public class SharePartition {
private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch
inFlightBatch,
List<PersisterStateBatch> stateBatches,
+
List<DlqBatch> dlqBatches,
String
memberId) {
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : RecordState.AVAILABLE,
DeliveryCountOps.NO_OP,
maxDeliveryCount(),
- EMPTY_MEMBER_ID);
+ EMPTY_MEMBER_ID,
+ shareGroupDlqEnableSupplier.get());
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for
the batch: {}"
+ " for the share partition: {}-{} memberId: {}",
inFlightBatch, groupId, topicIdPartition, memberId);
@@ -3060,6 +3026,16 @@ public class SharePartition {
// Cancel the acquisition lock timeout task for the batch since it
is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
+
+ if (updateResult.state() == RecordState.ARCHIVING) {
+ // Don't increment deliveryCompleteCount here — deferred to
phase 2
+ // Don't updateFindNextFetchOffset — ARCHIVING is not fetchable
+ dlqBatches.add(new DlqBatch(updateResult,
+ inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
+ (short) updateResult.deliveryCount()));
+ return;
+ }
+
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
@@ -3079,6 +3055,7 @@ public class SharePartition {
private void
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch,
List<PersisterStateBatch> stateBatches,
+
List<DlqBatch> dlqBatches,
String
memberId,
long
firstOffset,
long
lastOffset) {
@@ -3103,7 +3080,8 @@ public class SharePartition {
offsetState.getKey() < startOffset ? RecordState.ARCHIVED
: RecordState.AVAILABLE,
DeliveryCountOps.NO_OP,
maxDeliveryCount(),
- EMPTY_MEMBER_ID);
+ EMPTY_MEMBER_ID,
+ shareGroupDlqEnableSupplier.get());
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for
the offset: {} in batch: {}"
+ " for the share partition: {}-{} memberId:
{}", offsetState.getKey(), inFlightBatch,
@@ -3115,6 +3093,15 @@ public class SharePartition {
// Cancel the acquisition lock timeout task for the offset since
it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
+
+ if (updateResult.state() == RecordState.ARCHIVING) {
+ // Don't increment deliveryCompleteCount here — deferred to
phase 2
+ // Don't updateFindNextFetchOffset — ARCHIVING is not fetchable
+ dlqBatches.add(new DlqBatch(updateResult, offsetState.getKey(),
+ offsetState.getKey(), (short)
updateResult.deliveryCount()));
+ continue;
+ }
+
if (updateResult.state() != RecordState.ARCHIVED) {
updateFindNextFetchOffset(true);
}
@@ -3339,6 +3326,57 @@ public class SharePartition {
return ACK_TYPE_TO_RECORD_STATE.get(ackType);
}
+ // Visible for testing.
+
+ /**
+ * The DLQ flow comprises 2 phases:
+ * Phase 1: State transitions to ARCHIVING (happens in the normal
acknowledge/release/timeout path) and persists ARCHIVING to the persister
+ * Phase 2: Enqueues to DLQ, then transitions ARCHIVING → ARCHIVED and
persists ARCHIVED to the persister
+ * This method handles the complete phase 2 flow.
+ */
+ void initiateDLQAndArchive(InFlightState updatedState, long firstOffset,
+ long lastOffset, short deliveryCount, Throwable
dlqCause) {
+ // Step 1: Enqueue to DLQ
+ shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
+ groupId, topicIdPartition, firstOffset, lastOffset,
+ Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
+ )).whenComplete((v1, dlqException) -> {
+ if (dlqException != null) {
+ log.error("Failed to write to DLQ, proceeding to ARCHIVED
regardless.", dlqException);
+ }
+
+ // Step 2: Transition ARCHIVING → ARCHIVED
+ PersisterStateBatch stateBatch;
+ lock.writeLock().lock();
+ try {
+ // At this point ARCHIVED is imminent. If we rollback here or
tryUpdateState fails,
+ // we risk stalling. So just move to ARCHIVED.
+ updatedState.archive();
+ stateBatch = new PersisterStateBatch(firstOffset, lastOffset,
RecordState.ARCHIVED.id, deliveryCount);
+
deliveryCompleteCount.addAndGet(numInFlightRecordsInBatch(firstOffset,
lastOffset));
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ // Step 3: Persist ARCHIVED. On failure, ARCHIVED stays in memory
— the
+ // persister catches up when start offset advances past these
offsets.
+ writeShareGroupState(List.of(stateBatch))
+ .whenComplete((v2, phase2Exception) -> {
+ boolean cacheUpdated = false;
+ lock.writeLock().lock();
+ try {
+ if (phase2Exception != null) {
+ log.error("Could not persist ARCHIVED state for
{}", stateBatch, phase2Exception);
+ }
+ cacheUpdated = maybeUpdateCachedStateAndOffsets();
+ } finally {
+ lock.writeLock().unlock();
+ maybeCompleteDelayedShareFetchRequest(cacheUpdated);
+ }
+ });
+ });
+ }
+
// Visible for testing.
boolean containsAbortMarker(RecordBatch batch) {
if (!batch.isControlBatch())
@@ -3534,6 +3572,16 @@ public class SharePartition {
int maxRecords
) { }
+ /**
+ * Record comprising state as well as offset information for processing by
DLQ logic.
+ */
+ private record DlqBatch(
+ InFlightState updatedState,
+ long firstOffset, long lastOffset,
+ short deliveryCount
+ ) {
+ }
+
// Visibility for testing
static Map<Byte, RecordState> ackTypeToRecordStateMapping() {
return ACK_TYPE_TO_RECORD_STATE;
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index f7b91bcc93e..2732d4897f6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.InFlightBatch;
@@ -63,12 +64,15 @@ import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
+import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.NoOpStatePersister;
import org.apache.kafka.server.share.persister.PartitionFactory;
+import org.apache.kafka.server.share.persister.PartitionStateBatchData;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.ReadShareGroupStateResult;
import org.apache.kafka.server.share.persister.TopicData;
+import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -81,6 +85,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.nio.ByteBuffer;
@@ -97,6 +105,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -117,7 +126,7 @@ import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@SuppressWarnings("resource")
+@SuppressWarnings({"resource", "ClassFanOutComplexity"})
public class SharePartitionTest {
private static final String ACQUISITION_LOCK_NEVER_GOT_RELEASED =
"Acquisition lock never got released.";
@@ -12738,6 +12747,555 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.deliveryCompleteCount());
}
+ @Test
+ public void testAcquisitionLockTimeoutWithDlqEnabledCompleteBatch() throws
InterruptedException {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire two batches so the first stays in cache after being
archived.
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+
+ assertEquals(2, sharePartition.timer().size());
+
+ // First timeout: delivery count 1 < maxDeliveryCount 2, so records go
to AVAILABLE.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1
&&
+ sharePartition.cachedState().get(10L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(10L).batchDeliveryCount() ==
1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(),
10L, List.of())));
+
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+
+ // Re-acquire the first batch, bringing delivery count to 2 (==
maxDeliveryCount).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(2,
sharePartition.cachedState().get(0L).batchDeliveryCount());
+ assertEquals(1, sharePartition.timer().size());
+
+ // Second timeout: delivery count 2 >= maxDeliveryCount 2,
tryUpdateState redirects
+ // AVAILABLE -> ARCHIVING (DLQ enabled). Phase 2 completes: ARCHIVING
-> ARCHIVED.
+ // With NoOp DLQ + NoOp persister, the full 2-phase flow completes
synchronously.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L) == null,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> "Batch at offset 0 was not evicted after DLQ archival. Timer
size: " +
+ sharePartition.timer().size() + ", cachedState keys: " +
sharePartition.cachedState().keySet());
+
+ // Batch at offset 0 is evicted from cache after reaching ARCHIVED and
start offset advancing.
+ assertEquals(10, sharePartition.startOffset());
+ // deliveryCompleteCount is 0 because eviction subtracts the count.
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ // Second batch remains AVAILABLE.
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+ }
+
+ @Test
+ public void testAcquisitionLockTimeoutWithDlqEnabledPerOffsetBatch()
throws InterruptedException {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire a batch of 10 records (offsets 0-9).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ assertEquals(1, sharePartition.timer().size());
+
+ // First timeout: all go to AVAILABLE with delivery count 1.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of())));
+
+ // Re-acquire only the first 5 records (offsets 0-4), forcing offset
state initialization.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+
+ // Offsets 0-4 should be ACQUIRED with delivery count 2, offsets 5-9
remain AVAILABLE.
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+ for (long i = 0; i < 5; i++) {
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(i).state());
+ assertEquals(2,
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+ }
+ for (long i = 5; i < 10; i++) {
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(i).state());
+ assertEquals(1,
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+ }
+
+ // Second timeout for offsets 0-4: delivery count 2 >=
maxDeliveryCount 2.
+ // tryUpdateState redirects AVAILABLE -> ARCHIVING. Phase 2: ARCHIVING
-> ARCHIVED.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> {
+ Map<Long, InFlightState> offsetState =
sharePartition.cachedState().get(0L).offsetState();
+ if (offsetState == null) return false;
+ for (long i = 0; i < 5; i++) {
+ if (offsetState.get(i).state() != RecordState.ARCHIVED ||
offsetState.get(i).deliveryCount() != 2) {
+ return false;
+ }
+ }
+ for (long i = 5; i < 10; i++) {
+ if (offsetState.get(i).state() != RecordState.AVAILABLE ||
offsetState.get(i).deliveryCount() != 1) {
+ return false;
+ }
+ }
+ return sharePartition.timer().size() == 0;
+ },
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));
+
+ // Offsets 0-4 are ARCHIVED, 5-9 are AVAILABLE. Next fetch offset
moves to 5
+ // since offsets 0-4 are no longer fetchable.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void testAcquisitionLockTimeoutWithDlqDisabledCompleteBatch()
throws InterruptedException {
+ // Verify that without DLQ, max delivery count still causes ARCHIVED
(not ARCHIVING).
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> false)
+ .build();
+
+ // Acquire two batches.
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+
+ // First timeout: records go to AVAILABLE.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of())));
+
+ // Re-acquire first batch.
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+
+ // Second timeout: delivery count reaches max, goes directly to
ARCHIVED (no ARCHIVING).
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L) == null,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> "Batch at offset 0 was not evicted after archival. Timer
size: " +
+ sharePartition.timer().size() + ", cachedState keys: " +
sharePartition.cachedState().keySet());
+
+ // Batch evicted, start offset advances.
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ }
+
+ @Test
+ public void testAcquisitionLockTimeoutWithDlqEnabledMixedOffsets() throws
InterruptedException {
+ // Test where some offsets in a batch exceed max delivery count and
some don't.
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire batch of 10 records (offsets 0-9).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+
+ // Timeout #1: all go to AVAILABLE, delivery count 1.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of())));
+
+ // Re-acquire only offsets 0-4. This forces offset state
initialization.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+
+ // Acknowledge offsets 0-2 as ACCEPT. Only offsets 3-4 remain ACQUIRED.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(0, 4, List.of(
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id))));
+ assertNull(ackResult.join());
+
+ // Re-acquire offsets 5-9. These will have delivery count 2.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+
+ // Now offsets 0-4: ACKNOWLEDGED (delivery count 2), offsets 5-9:
ACQUIRED (delivery count 2).
+ // Timeout #2 for offsets 5-9: delivery count 2 >= max 2 → ARCHIVING →
ARCHIVED.
+ // Once all offsets reach terminal state, the batch is evicted from
cache.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L) == null,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> "Batch at offset 0 was not evicted after DLQ archival. Timer
size: " +
+ sharePartition.timer().size() + ", cachedState keys: " +
sharePartition.cachedState().keySet());
+ }
+
+ @Test
+ public void
testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() throws
InterruptedException {
+ // Phase 1 persist of ARCHIVING fails, but phase 2 still proceeds
unconditionally
+ // because timeout path uses tryUpdateState (no rollback).
+ Persister persister = Mockito.mock(Persister.class);
+ mockPersisterReadStateMethod(persister);
+
+ // First call succeeds (for acknowledge of first batch), subsequent
calls fail.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire two batches.
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+ assertEquals(2, sharePartition.timer().size());
+
+ // First timeout: delivery count 1 < max 2, records go to AVAILABLE.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(),
10L, List.of())));
+
+ // Re-acquire first batch, delivery count becomes 2 (== max).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ assertEquals(2,
sharePartition.cachedState().get(0L).batchDeliveryCount());
+
+ // Second timeout: tryUpdateState redirects AVAILABLE → ARCHIVING.
+ // Phase 1 persist fails. Phase 2 proceeds (DLQ enqueue +
tryUpdateState(ARCHIVED)).
+ // Phase 2 persist also fails, but since isTimeout=true, no rollback —
ARCHIVED stays in memory.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L) == null,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> "Batch at offset 0 was not evicted. Timer size: " +
+ sharePartition.timer().size() + ", cachedState keys: " +
sharePartition.cachedState().keySet());
+
+ // Despite both persists failing, batch reached ARCHIVED in memory (no
rollback for timeout)
+ // and was evicted from cache.
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ // Second batch went to AVAILABLE (delivery count 1 < max).
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+ }
+
+ @Test
+ public void
testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() throws
InterruptedException {
+ // Phase 1 persist of ARCHIVING fails for per-offset batch, but phase
2 still proceeds.
+ Persister persister = Mockito.mock(Persister.class);
+ mockPersisterReadStateMethod(persister);
+
+ // All write calls return error.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire batch of 10 records (offsets 0-9).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ assertEquals(1, sharePartition.timer().size());
+
+ // First timeout: all go to AVAILABLE, delivery count 1.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of())));
+
+ // Re-acquire only first 5 offsets, forcing offset state
initialization.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+ for (long i = 0; i < 5; i++) {
+ assertEquals(2,
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+ }
+
+ // Second timeout for offsets 0-4: delivery count 2 >= max 2 →
ARCHIVING.
+ // Phase 1 persist fails. Phase 2 proceeds (tryUpdateState(ARCHIVED),
no rollback).
+ // Phase 2 persist also fails, but ARCHIVED stays in memory.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> {
+ Map<Long, InFlightState> offsetState =
sharePartition.cachedState().get(0L).offsetState();
+ if (offsetState == null) return false;
+ for (long i = 0; i < 5; i++) {
+ if (offsetState.get(i).state() != RecordState.ARCHIVED ||
offsetState.get(i).deliveryCount() != 2) {
+ return false;
+ }
+ }
+ for (long i = 5; i < 10; i++) {
+ if (offsetState.get(i).state() != RecordState.AVAILABLE ||
offsetState.get(i).deliveryCount() != 1) {
+ return false;
+ }
+ }
+ return sharePartition.timer().size() == 0;
+ },
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));
+
+ // Despite both persists failing, offsets 0-4 reached ARCHIVED in
memory (no rollback for timeout).
+ // Offsets 5-9 remain AVAILABLE.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBatch()
throws InterruptedException {
+ // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED)
succeeds.
+ // Records should reach ARCHIVED despite phase 1 failure.
+ Persister persister = Mockito.mock(Persister.class);
+ mockPersisterReadStateMethod(persister);
+
+ WriteShareGroupStateResult failureResult =
Mockito.mock(WriteShareGroupStateResult.class);
+ Mockito.when(failureResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+ WriteShareGroupStateResult successResult =
Mockito.mock(WriteShareGroupStateResult.class);
+ Mockito.when(successResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
+ // First writeState call (phase 1) fails, subsequent calls (phase 2)
succeed.
+ Mockito.when(persister.writeState(Mockito.any()))
+ .thenReturn(CompletableFuture.completedFuture(failureResult))
+ .thenReturn(CompletableFuture.completedFuture(successResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire two batches.
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+ assertEquals(2, sharePartition.timer().size());
+
+ // First timeout: delivery count 1 < max 2, records go to AVAILABLE.
+ // This also calls writeState (fails), but timeout path commits
unconditionally.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(),
10L, List.of())));
+
+ // Re-acquire first batch, delivery count becomes 2 (== max).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ assertEquals(2,
sharePartition.cachedState().get(0L).batchDeliveryCount());
+
+ // Reset mock: phase 1 fails, phase 2 succeeds.
+ Mockito.when(persister.writeState(Mockito.any()))
+ .thenReturn(CompletableFuture.completedFuture(failureResult))
+ .thenReturn(CompletableFuture.completedFuture(successResult));
+
+ // Second timeout: ARCHIVING (phase 1 persist fails), phase 2 proceeds
and succeeds.
+ // DLQ enqueue + ARCHIVING → ARCHIVED persisted successfully.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L) == null,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> "Batch at offset 0 was not evicted. Timer size: " +
+ sharePartition.timer().size() + ", cachedState keys: " +
sharePartition.cachedState().keySet());
+
+ // Phase 1 failed but phase 2 succeeded — batch reached ARCHIVED and
was evicted.
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+ }
+
+ @Test
+ public void
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetBatch()
throws InterruptedException {
+ // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED)
succeeds for per-offset batch.
+ Persister persister = Mockito.mock(Persister.class);
+ mockPersisterReadStateMethod(persister);
+
+ WriteShareGroupStateResult failureResult =
Mockito.mock(WriteShareGroupStateResult.class);
+ Mockito.when(failureResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+ WriteShareGroupStateResult successResult =
Mockito.mock(WriteShareGroupStateResult.class);
+ Mockito.when(successResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
+ // First writeState call fails, subsequent calls succeed.
+ Mockito.when(persister.writeState(Mockito.any()))
+ .thenReturn(CompletableFuture.completedFuture(failureResult))
+ .thenReturn(CompletableFuture.completedFuture(successResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire batch of 10 records (offsets 0-9).
+ fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+ assertEquals(1, sharePartition.timer().size());
+
+ // First timeout: all go to AVAILABLE, delivery count 1.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of())));
+
+ // Re-acquire only first 5 offsets, forcing offset state
initialization.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+ for (long i = 0; i < 5; i++) {
+ assertEquals(2,
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+ }
+
+ // Reset mock: phase 1 fails, phase 2 calls succeed.
+ Mockito.when(persister.writeState(Mockito.any()))
+ .thenReturn(CompletableFuture.completedFuture(failureResult))
+ .thenReturn(CompletableFuture.completedFuture(successResult));
+
+ // Second timeout for offsets 0-4: delivery count 2 >= max 2 →
ARCHIVING.
+ // Phase 1 persist fails, but phase 2 succeeds for each offset →
ARCHIVED.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> {
+ Map<Long, InFlightState> offsetState =
sharePartition.cachedState().get(0L).offsetState();
+ if (offsetState == null) return false;
+ for (long i = 0; i < 5; i++) {
+ if (offsetState.get(i).state() != RecordState.ARCHIVED ||
offsetState.get(i).deliveryCount() != 2) {
+ return false;
+ }
+ }
+ for (long i = 5; i < 10; i++) {
+ if (offsetState.get(i).state() != RecordState.AVAILABLE ||
offsetState.get(i).deliveryCount() != 1) {
+ return false;
+ }
+ }
+ return sharePartition.timer().size() == 0;
+ },
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));
+
+ // Phase 1 failed but phase 2 succeeded — offsets 0-4 reached ARCHIVED.
+ // Offsets 5-9 remain AVAILABLE.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ }
+
+ // Unit tests for processDlqPhase2 method directly.
+
+ private static Stream<Arguments> initiateDLQAndArchiveParameters() {
+ return Stream.of(
+ // name,
persistSucceeds, expectedState, dlqCause,
firstOffset, lastOffset, deliveryCount
+ Arguments.of("persist succeeds", true,
RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 0L,
9L, (short) 2),
+ Arguments.of("persist fails, no rollback", false,
RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 0L,
9L, (short) 2),
+ Arguments.of("client reject cause", true,
RecordState.ARCHIVED, ShareGroupDLQ.CLIENT_REJECT, 5L,
5L, (short) 1),
+ Arguments.of("null cause", true,
RecordState.ARCHIVED, null, 0L,
4L, (short) 1),
+ Arguments.of("single offset", true,
RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 7L,
7L, (short) 3),
+ Arguments.of("delivery count exceeded cause", true,
RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 10L,
19L, (short) 5)
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("initiateDLQAndArchiveParameters")
+ public void testInitiateDLQAndArchive(String name, boolean persistSucceeds,
+ RecordState expectedState, Throwable
dlqCause,
+ long firstOffset, long lastOffset,
short deliveryCount) {
+ Persister persister = Mockito.mock(Persister.class);
+ mockPersisterReadStateMethod(persister);
+
+ WriteShareGroupStateResult writeResult =
Mockito.mock(WriteShareGroupStateResult.class);
+ Errors error = persistSucceeds ? Errors.NONE :
Errors.GROUP_ID_NOT_FOUND;
+ Mockito.when(writeResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, error.code(),
error.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ InFlightState state = new InFlightState(RecordState.ARCHIVING,
deliveryCount, EMPTY_MEMBER_ID);
+
+ sharePartition.initiateDLQAndArchive(state, firstOffset, lastOffset,
deliveryCount, dlqCause);
+
+ assertEquals(expectedState, state.state());
+ assertFalse(state.hasOngoingStateTransition());
+
+ // Verify persister.writeState was called exactly once with the
correct state batch.
+ ArgumentCaptor<WriteShareGroupStateParameters> captor =
+ ArgumentCaptor.forClass(WriteShareGroupStateParameters.class);
+ Mockito.verify(persister,
Mockito.times(1)).writeState(captor.capture());
+
+ WriteShareGroupStateParameters params = captor.getValue();
+ GroupTopicPartitionData<PartitionStateBatchData> data =
+ params.groupTopicPartitionData();
+ assertEquals(GROUP_ID, data.groupId());
+ assertEquals(1, data.topicsData().size());
+ assertEquals(TOPIC_ID_PARTITION.topicId(),
data.topicsData().get(0).topicId());
+ assertEquals(1, data.topicsData().get(0).partitions().size());
+
+ PartitionStateBatchData partitionData =
data.topicsData().get(0).partitions().get(0);
+ assertEquals(1, partitionData.stateBatches().size());
+ PersisterStateBatch stateBatch = partitionData.stateBatches().get(0);
+ assertEquals(firstOffset, stateBatch.firstOffset());
+ assertEquals(lastOffset, stateBatch.lastOffset());
+ assertEquals(RecordState.ARCHIVED.id(), stateBatch.deliveryState());
+ assertEquals(deliveryCount, stateBatch.deliveryCount());
+
+ // Verify readState was not called by processDlqPhase2.
+ Mockito.verify(persister, Mockito.never()).readState(Mockito.any());
+ }
+
private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
ShareGroupConfigProvider configProvider =
Mockito.mock(ShareGroupConfigProvider.class);
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
index ea4f5367d1b..5d64499a4f9 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -32,7 +32,7 @@ public class NoOpShareGroupDLQManager implements
ShareGroupDLQManager {
@Override
public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param)
{
- log.trace("Enqueuing share group dlq record parameter: {}", param);
+ log.warn("Enqueuing share group dlq record parameter: {}", param);
return CompletableFuture.completedFuture(null);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
index 583a5477611..83fb32dd2aa 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
@@ -166,11 +166,12 @@ public class InFlightBatch {
* @param ops The behavior on the delivery count.
* @param maxDeliveryCount The maximum delivery count for the records.
* @param newMemberId The new member id for the records.
+ * @param dlqSupportEnabled Boolean indicating if share group DLQ support
is enabled.
* @return {@code InFlightState} if update succeeds, null otherwise.
Returning state helps update chaining.
* @throws IllegalStateException if the offset state is maintained and the
batch state is not available.
*/
- public InFlightState tryUpdateBatchState(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
- return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount,
newMemberId);
+ public InFlightState tryUpdateBatchState(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId, boolean
dlqSupportEnabled) {
+ return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount,
newMemberId, dlqSupportEnabled);
}
/**
@@ -181,13 +182,14 @@ public class InFlightBatch {
* @param ops The behavior on the delivery count.
* @param maxDeliveryCount The maximum delivery count for the records.
* @param newMemberId The new member id for the records.
+ * @param dlqSupportEnabled Boolean indicating if share group DLQ support
is enabled.
* @return {@code InFlightState} if update succeeds, null otherwise.
Returning state helps update chaining.
* @throws IllegalStateException if the offset state is maintained and the
batch state is not available.
*/
public InFlightState startBatchStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount,
- String newMemberId
+ String newMemberId, boolean dlqSupportEnabled
) {
- return inFlightState().startStateTransition(newState, ops,
maxDeliveryCount, newMemberId);
+ return inFlightState().startStateTransition(newState, ops,
maxDeliveryCount, newMemberId, dlqSupportEnabled);
}
/**
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
index 0ed99562ef8..c5e9cb5c1af 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -147,7 +147,7 @@ public class InFlightState {
* @return {@code InFlightState} if update succeeds, null otherwise.
Returning state
* helps update chaining.
*/
- public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps
ops, int maxDeliveryCount, String newMemberId) {
+ public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps
ops, int maxDeliveryCount, String newMemberId, boolean dlqSupportEnabled) {
try {
// If the state transition is in progress, the state should not be
updated.
if (hasOngoingStateTransition()) {
@@ -161,7 +161,7 @@ public class InFlightState {
}
if (newState == RecordState.AVAILABLE && ops !=
DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
- newState = RecordState.ARCHIVED;
+ newState = dlqSupportEnabled ? RecordState.ARCHIVING :
RecordState.ARCHIVED;
}
state = state.validateTransition(newState);
if (newState != RecordState.ARCHIVED) {
@@ -200,9 +200,9 @@ public class InFlightState {
* @return {@code InFlightState} if update succeeds, null otherwise.
Returning state
* helps update chaining.
*/
- public InFlightState startStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+ public InFlightState startStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId, boolean
dlqSupportEnabled) {
InFlightState currentState = new InFlightState(state, deliveryCount,
memberId, acquisitionLockTimeoutTask);
- InFlightState updatedState = tryUpdateState(newState, ops,
maxDeliveryCount, newMemberId);
+ InFlightState updatedState = tryUpdateState(newState, ops,
maxDeliveryCount, newMemberId, dlqSupportEnabled);
if (updatedState != null) {
rollbackState = new RollbackState(currentState, maxDeliveryCount);
}