This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6c0778aeaec KAFKA-20245: DLQ share group client REJECT ack records.
[2/N] (#21793)
6c0778aeaec is described below
commit 6c0778aeaec926cf7287842d9b4c91660bc47d6c
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Apr 11 02:23:26 2026 +0530
KAFKA-20245: DLQ share group client REJECT ack records. [2/N] (#21793)
* PR adds code in `SharePartition` to support DLQ of records (batch and
per offset) which have been REJECT acknowledged by the client.
* Appropriate unit tests have been added where applicable.
* The dynamic `ShareVersion` feature has been used to detect DLQ
support.
* Current impl hardcodes `NoOpShareGroupDLQManager`. This will be
updated to specific implementation in future PRs.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 134 +++++++++++++++++--
.../kafka/server/share/SharePartitionManager.java | 26 +++-
.../src/main/scala/kafka/server/BrokerServer.scala | 5 +-
.../server/share/SharePartitionManagerTest.java | 10 +-
.../kafka/server/share/SharePartitionTest.java | 143 ++++++++++++++++++++-
.../server/share/dlq/NoOpShareGroupDLQManager.java | 6 +
.../kafka/server/share/dlq/ShareGroupDLQ.java | 4 -
.../share/dlq/ShareGroupDLQRecordParameter.java | 2 +-
8 files changed, 304 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 28afd716960..6863cc4d91d 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -42,6 +42,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -89,6 +92,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
@@ -325,6 +329,16 @@ public class SharePartition {
*/
private long fetchLockIdleDurationMs;
+ /**
+ * Reference to the dlq manager implementation.
+ */
+ private final ShareGroupDLQ shareGroupDLQ = new NoOpShareGroupDLQManager();
+
+ /**
+ * Supplier to toggle dlq support.
+ */
+ private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
@@ -337,11 +351,12 @@ public class SharePartition {
Persister persister,
ReplicaManager replicaManager,
ShareGroupConfigProvider configProvider,
- SharePartitionListener listener
+ SharePartitionListener listener,
+ Supplier<Boolean> shareGroupDlqEnableSupplier
) {
this(groupId, topicIdPartition, leaderEpoch,
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, configProvider,
SharePartitionState.EMPTY, listener,
- new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()));
+ new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()), shareGroupDlqEnableSupplier);
}
// Visible for testing
@@ -360,7 +375,8 @@ public class SharePartition {
ShareGroupConfigProvider configProvider,
SharePartitionState sharePartitionState,
SharePartitionListener listener,
- SharePartitionMetrics sharePartitionMetrics
+ SharePartitionMetrics sharePartitionMetrics,
+ Supplier<Boolean> shareGroupDlqEnableSupplier
) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
@@ -386,6 +402,7 @@ public class SharePartition {
this.timeoutHandler = releaseAcquisitionLockOnTimeout();
this.registerGaugeMetrics();
this.deliveryCompleteCount = new AtomicInteger(0);
+ this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
}
/**
@@ -1123,7 +1140,7 @@ public class SharePartition {
// Successfully updated the state of the offset and created a
persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(offsetState.getKey(),
- offsetState.getKey(), updateResult.state().id(), (short)
updateResult.deliveryCount())));
+ offsetState.getKey(), updateResult.state().id(), (short)
updateResult.deliveryCount()), null));
if (offsetState.getKey() >= startOffset &&
isStateTerminal(updateResult.state())) {
deliveryCompleteCount.incrementAndGet();
}
@@ -1164,7 +1181,7 @@ public class SharePartition {
// Successfully updated the state of the batch and created a
persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(inFlightBatch.firstOffset(),
- inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount())));
+ inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount()), null));
if (isStateTerminal(updateResult.state())) {
deliveryCompleteCount.addAndGet(numInFlightRecordsInBatch(inFlightBatch.firstOffset(),
inFlightBatch.lastOffset()));
}
@@ -2317,7 +2334,8 @@ public class SharePartition {
// by the client, then use the batch record state. This
will always be present as it is a static
// mapping between bytes and record state type. All ack
types have been added except for RENEW which
// has been handled above.
- RecordState recordState =
ACK_TYPE_TO_RECORD_STATE.get(ackType);
+ RecordState recordState = recordStateWithDlq(ackType);
+ Throwable dlqCause = recordState == RecordState.ARCHIVING
? ShareGroupDLQ.CLIENT_REJECT : null;
if (recordState == null) {
return Optional.of(new
IllegalArgumentException("Unknown acknowledge type id: " + ackType));
}
@@ -2336,9 +2354,10 @@ public class SharePartition {
return Optional.of(new InvalidRecordStateException(
"Unable to acknowledge records for the batch"));
}
+
// Successfully updated the state of the offset and
created a persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(offsetState.getKey(),
- offsetState.getKey(), updateResult.state().id(),
(short) updateResult.deliveryCount())));
+ offsetState.getKey(), updateResult.state().id(),
(short) updateResult.deliveryCount()), dlqCause));
if (isStateTerminal(updateResult.state())) {
deliveryCompleteCount.incrementAndGet();
}
@@ -2396,7 +2415,8 @@ public class SharePartition {
// The member id is reset to EMPTY_MEMBER_ID irrespective of the
ack type as the batch is
// either released or moved to a state where member id existence
is not important. The member id
// is only important when the batch is acquired.
- RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
+ RecordState recordState = recordStateWithDlq(ackType);
+ Throwable dlqCause = recordState == RecordState.ARCHIVING ?
ShareGroupDLQ.CLIENT_REJECT : null;
if (recordState == null) {
return Optional.of(new IllegalArgumentException("Unknown
acknowledge type id: " + ackType));
}
@@ -2417,7 +2437,7 @@ public class SharePartition {
// Successfully updated the state of the batch and created a
persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(inFlightBatch.firstOffset(),
- inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount())));
+ inFlightBatch.lastOffset(), updateResult.state().id(), (short)
updateResult.deliveryCount()), dlqCause));
if (isStateTerminal(updateResult.state())) {
deliveryCompleteCount.addAndGet((int)
(inFlightBatch.lastOffset() - inFlightBatch.firstOffset() + 1));
}
@@ -2527,12 +2547,96 @@ public class SharePartition {
log.trace("State change request successful for share
partition: {}-{}",
groupId, topicIdPartition);
- persisterBatches.forEach(persisterBatch -> {
+
+ List<PersisterBatch> nonDlqBatches = new
ArrayList<>(persisterBatches.size());
+ List<PersisterBatch> dlqBatches = new
ArrayList<>(persisterBatches.size());
+ for (PersisterBatch persisterBatch : persisterBatches) {
+ if (persisterBatch.updatedState.state() ==
RecordState.ARCHIVING) {
+ dlqBatches.add(persisterBatch);
+ } else {
+ nonDlqBatches.add(persisterBatch);
+ }
+ }
+
+ nonDlqBatches.forEach(persisterBatch -> {
persisterBatch.updatedState.completeStateTransition(true);
if (persisterBatch.updatedState.state() ==
RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
+
+ dlqBatches.forEach(persisterBatch -> {
+
persisterBatch.updatedState.completeStateTransition(true);
+ shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
+ groupId,
+ topicIdPartition,
+ persisterBatch.stateBatch.firstOffset(),
+ persisterBatch.stateBatch.lastOffset(),
+
Optional.of(persisterBatch.stateBatch.deliveryCount()),
+ Optional.ofNullable(persisterBatch.dlqCause),
+ false
+ )).whenComplete((v1, dlqException) -> {
+ PersisterStateBatch sb = persisterBatch.stateBatch;
+ if (dlqException != null) {
+ log.error("Failed to write to DLQ for share
partition: {}-{}, offsets {}-{}. "
+ + "Proceeding to ARCHIVED state
regardless.",
+ groupId, topicIdPartition,
sb.firstOffset(), sb.lastOffset(), dlqException);
+ }
+
+ PersisterBatch phase2Batch;
+ lock.writeLock().lock();
+ try {
+ // dlqBatch.updatedState() is the same
InFlightState object in the cache,
+ // now committed in ARCHIVING. Transition it
directly.
+ InFlightState updateResult =
persisterBatch.updatedState().startStateTransition(
+ RecordState.ARCHIVED,
+ DeliveryCountOps.NO_OP,
+ maxDeliveryCount(),
+ EMPTY_MEMBER_ID
+ );
+ if (updateResult == null) {
+ log.error("Unable to transition ARCHIVING
→ ARCHIVED for offsets {}-{} "
+ + "in share partition: {}-{}",
sb.firstOffset(), sb.lastOffset(),
+ groupId, topicIdPartition);
+ return;
+ }
+ phase2Batch = new PersisterBatch(updateResult,
new PersisterStateBatch(
+ sb.firstOffset(), sb.lastOffset(),
+ updateResult.state().id(), (short)
updateResult.deliveryCount()), null);
+ deliveryCompleteCount.addAndGet(
+
numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ // Second persist: ARCHIVING → ARCHIVED
+
writeShareGroupState(List.of(phase2Batch.stateBatch()))
+ .whenComplete((v2, phase2Exception) -> {
+ boolean phase2CacheUpdated = false;
+ lock.writeLock().lock();
+ try {
+ if (phase2Exception != null) {
+ log.error("Failed to persist
ARCHIVED state for DLQ phase 2, "
+ + "share partition: {}-{}.
Records remain in ARCHIVING.",
+ groupId, topicIdPartition,
phase2Exception);
+
phase2Batch.updatedState().completeStateTransition(false);
+ if
(isStateTerminal(RecordState.forId(phase2Batch.stateBatch().deliveryState()))
+ &&
!isStateTerminal(phase2Batch.updatedState().state())) {
+
deliveryCompleteCount.addAndGet(
+
-numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
+ }
+ return;
+ }
+
+
phase2Batch.updatedState().completeStateTransition(true);
+ phase2CacheUpdated =
maybeUpdateCachedStateAndOffsets();
+ } finally {
+ lock.writeLock().unlock();
+
maybeCompleteDelayedShareFetchRequest(phase2CacheUpdated);
+ }
+ });
+ });
+ });
// Update the cached state and start and end offsets after
acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
future.complete(null);
@@ -3225,6 +3329,13 @@ public class SharePartition {
return batch.isTransactional() &&
abortedProducerIds.contains(batch.producerId());
}
+ private RecordState recordStateWithDlq(byte ackType) {
+ if (shareGroupDlqEnableSupplier.get() && AcknowledgeType.REJECT.id ==
ackType) {
+ return RecordState.ARCHIVING;
+ }
+ return ACK_TYPE_TO_RECORD_STATE.get(ackType);
+ }
+
// Visible for testing.
boolean containsAbortMarker(RecordBatch batch) {
if (!batch.isControlBatch())
@@ -3397,7 +3508,8 @@ public class SharePartition {
*/
private record PersisterBatch(
InFlightState updatedState,
- PersisterStateBatch stateBatch
+ PersisterStateBatch stateBatch,
+ Throwable dlqCause
) { }
/**
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index a6311c44bd8..3b4db2162ee 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -74,6 +74,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* The SharePartitionManager is responsible for managing the SharePartitions
and ShareSessions.
@@ -148,6 +149,11 @@ public class SharePartitionManager implements
AutoCloseable {
*/
private final BrokerTopicStats brokerTopicStats;
+ /**
+ * Supplier indicating whether DLQ support is enabled.
+ */
+ private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+
public SharePartitionManager(
ReplicaManager replicaManager,
Time time,
@@ -158,7 +164,8 @@ public class SharePartitionManager implements AutoCloseable
{
long remoteFetchMaxWaitMs,
Persister persister,
ShareGroupConfigProvider configProvider,
- BrokerTopicStats brokerTopicStats
+ BrokerTopicStats brokerTopicStats,
+ Supplier<Boolean> shareGroupDlqEnableSupplier
) {
this(replicaManager,
time,
@@ -171,7 +178,8 @@ public class SharePartitionManager implements AutoCloseable
{
persister,
configProvider,
new ShareGroupMetrics(time),
- brokerTopicStats
+ brokerTopicStats,
+ shareGroupDlqEnableSupplier
);
}
@@ -187,7 +195,8 @@ public class SharePartitionManager implements AutoCloseable
{
Persister persister,
ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
- BrokerTopicStats brokerTopicStats
+ BrokerTopicStats brokerTopicStats,
+ Supplier<Boolean> shareGroupDlqEnableSupplier
) {
this(replicaManager,
time,
@@ -202,11 +211,13 @@ public class SharePartitionManager implements
AutoCloseable {
persister,
configProvider,
shareGroupMetrics,
- brokerTopicStats
+ brokerTopicStats,
+ shareGroupDlqEnableSupplier
);
}
// Visible for testing.
+ @SuppressWarnings("ParameterNumber")
SharePartitionManager(
ReplicaManager replicaManager,
Time time,
@@ -220,7 +231,8 @@ public class SharePartitionManager implements AutoCloseable
{
Persister persister,
ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
- BrokerTopicStats brokerTopicStats
+ BrokerTopicStats brokerTopicStats,
+ Supplier<Boolean> shareGroupDlqEnableSupplier
) {
this.replicaManager = replicaManager;
this.time = time;
@@ -236,6 +248,7 @@ public class SharePartitionManager implements AutoCloseable
{
this.shareGroupMetrics = shareGroupMetrics;
this.brokerTopicStats = brokerTopicStats;
this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
+ this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
}
/**
@@ -719,7 +732,8 @@ public class SharePartitionManager implements AutoCloseable
{
persister,
replicaManager,
configProvider,
- listener
+ listener,
+ shareGroupDlqEnableSupplier
);
});
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index fe0fd1d2ec5..5f190a18869 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublish
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.FetchSession.FetchSessionCache
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
+import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, ShareVersion,
TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager,
RemoteLogManagerConfig}
@@ -459,7 +459,8 @@ class BrokerServer(
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
persister,
new ShareGroupConfigProvider(groupConfigManager),
- brokerTopicStats
+ brokerTopicStats,
+ () =>
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)).supportsShareGroupDLQ()
)
dataPlaneRequestProcessor = new KafkaApis(
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index c6c96f79230..80f941d488e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -112,6 +112,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import scala.Tuple2;
import scala.collection.Seq;
@@ -3246,6 +3247,7 @@ public class SharePartitionManagerTest {
private Timer timer = new MockTimer();
private ShareGroupMetrics shareGroupMetrics = new
ShareGroupMetrics(time);
private BrokerTopicStats brokerTopicStats;
+ private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager
replicaManager) {
this.replicaManager = replicaManager;
@@ -3282,6 +3284,11 @@ public class SharePartitionManagerTest {
return this;
}
+ private SharePartitionManagerBuilder
withShareGroupDlqEnableSupplier(Supplier<Boolean> shareGroupDlqEnableSupplier) {
+ this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+ return this;
+ }
+
public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder();
}
@@ -3299,7 +3306,8 @@ public class SharePartitionManagerTest {
persister,
new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
shareGroupMetrics,
- brokerTopicStats
+ brokerTopicStats,
+ shareGroupDlqEnableSupplier
);
}
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 24d37dc0700..c850f9e85a6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -96,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -12602,6 +12603,140 @@ public class SharePartitionTest {
assertFalse(sharePartition.cachedState().isEmpty());
}
+ @Test
+ public void testAcknowledgeRejectWithDlqEnabled() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire 2 batches so that the first one stays in cache after being
archived.
+ MemoryRecords records1 = memoryRecords(5, 5);
+ MemoryRecords records2 = memoryRecords(10, 5);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 5);
+ assertEquals(1, acquiredRecordsList.size());
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2,
5);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Acknowledge the first batch with REJECT.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.REJECT.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ // With NoOp DLQ manager and NoOp persister, the full 2-phase flow
completes synchronously:
+ // ACQUIRED -> ARCHIVING (phase 1 persist) -> DLQ enqueue -> ARCHIVED
(phase 2 persist).
+ // The final state should be ARCHIVED.
+ assertNull(sharePartition.cachedState().get(5L));
+ assertEquals(1, sharePartition.cachedState().size());
+
+ // The second batch should remain acquired.
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+
+ // Start offset advances past the archived batch.
+ assertEquals(10, sharePartition.startOffset());
+
+ // deliveryCompleteCount is 0 as evicted records are subtracted.
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ }
+
+ @Test
+ public void testAcknowledgeRejectWithDlqDisabled() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> false)
+ .build();
+
+ MemoryRecords records1 = memoryRecords(5, 5);
+ MemoryRecords records2 = memoryRecords(10, 5);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 5);
+ assertEquals(1, acquiredRecordsList.size());
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2,
5);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Acknowledge the first batch with REJECT when DLQ is disabled.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.REJECT.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ // Without DLQ, REJECT goes directly to ARCHIVED (no ARCHIVING
intermediate state).
+ // Once ARCHIVED, the batch at start offset is evicted from cache.
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNull(sharePartition.cachedState().get(5L));
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ }
+
+ @Test
+ public void testAcknowledgePerOffsetRejectWithDlqEnabled() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> true)
+ .build();
+
+ // Acquire a batch with 5 records (offsets 0-4) and a second batch to
keep cache populated.
+ MemoryRecords records1 = memoryRecords(5);
+ MemoryRecords records2 = memoryRecords(5, 5);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 5);
+ assertEquals(1, acquiredRecordsList.size());
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2,
5);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Acknowledge with per-offset ack types: ACCEPT for 0-2, REJECT for
3-4.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(0, 4, List.of(
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.REJECT.id, AcknowledgeType.REJECT.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ // With NoOp DLQ + NoOp persister, full 2-phase flow completes
synchronously.
+ // Batch should have offset state since offsets have different states.
+ // Once all offsets reach terminal state, the batch at start offset is
evicted.
+ assertNull(sharePartition.cachedState().get(0L));
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ }
+
+ @Test
+ public void testAcknowledgePerOffsetRejectWithDlqDisabled() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withShareGroupDlqEnableSupplier(() -> false)
+ .build();
+
+ MemoryRecords records1 = memoryRecords(5);
+ MemoryRecords records2 = memoryRecords(5, 5);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 5);
+ assertEquals(1, acquiredRecordsList.size());
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2,
5);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Acknowledge with per-offset ack types: ACCEPT for 0-2, REJECT for
3-4.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(0, 4, List.of(
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.REJECT.id, AcknowledgeType.REJECT.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ // Without DLQ, REJECT goes directly to ARCHIVED (no ARCHIVING
intermediate state).
+ // All offsets in first batch reach terminal state, so it is evicted.
+ assertNull(sharePartition.cachedState().get(0L));
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(0, sharePartition.deliveryCompleteCount());
+ }
+
private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
ShareGroupConfigProvider configProvider =
Mockito.mock(ShareGroupConfigProvider.class);
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
@@ -12623,6 +12758,7 @@ public class SharePartitionTest {
private SharePartitionState state = SharePartitionState.EMPTY;
private Time time = MOCK_TIME;
private SharePartitionMetrics sharePartitionMetrics =
Mockito.mock(SharePartitionMetrics.class);
+ private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
private SharePartitionBuilder withMaxInflightRecords(int
defaultMaxInflightRecords) {
this.defaultMaxInflightRecords = defaultMaxInflightRecords;
@@ -12669,6 +12805,11 @@ public class SharePartitionTest {
return this;
}
+ private SharePartitionBuilder
withShareGroupDlqEnableSupplier(Supplier<Boolean> shareGroupDlqEnableSupplier) {
+ this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+ return this;
+ }
+
public static SharePartitionBuilder builder() {
return new SharePartitionBuilder();
}
@@ -12676,7 +12817,7 @@ public class SharePartitionTest {
public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0,
defaultMaxInflightRecords, defaultMaxDeliveryCount,
defaultAcquisitionLockTimeoutMs, mockTimer, time,
persister, replicaManager, configProvider,
- state, Mockito.mock(SharePartitionListener.class),
sharePartitionMetrics);
+ state, Mockito.mock(SharePartitionListener.class),
sharePartitionMetrics, shareGroupDlqEnableSupplier);
}
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
index e7cb9efc615..cd7b971b47b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -17,6 +17,9 @@
package org.apache.kafka.server.share.dlq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.CompletableFuture;
/**
@@ -25,8 +28,11 @@ import java.util.concurrent.CompletableFuture;
* a successfully completed future.
*/
public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
+ private static final Logger log =
LoggerFactory.getLogger(NoOpShareGroupDLQManager.class);
+
@Override
public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param)
{
+ log.trace("Enqueuing share group dlq record parameter: {}", param);
return CompletableFuture.completedFuture(null);
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
index 8b56cc64073..96bf848076c 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
@@ -30,12 +30,8 @@ public interface ShareGroupDLQ {
}
}
- Throwable STALE_BATCH = new ShareGroupDLQThrowable("Offset part of stale
batch.");
- Throwable BEHIND_LSO = new ShareGroupDLQThrowable("Offset before LSO.");
- Throwable ABORTED_TRANSACTION = new ShareGroupDLQThrowable("Offset part of
aborted transaction.");
Throwable CLIENT_REJECT = new ShareGroupDLQThrowable("Offset rejected by
client.");
Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset
delivery count exceeded the threshold.");
- Throwable ACQUISITION_LOCK_TIMEOUT = new
ShareGroupDLQThrowable("Acquisition lock timed out.");
/**
* Main method exposed to the world to enqueuing a record to the share
groups dead letter queue.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
index ba15f41b13f..465279d1229 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -38,7 +38,7 @@ public record ShareGroupDLQRecordParameter(
TopicIdPartition topicIdPartition,
long firstOffset,
long lastOffset,
- Optional<Integer> deliveryCount,
+ Optional<Short> deliveryCount,
Optional<Throwable> cause,
boolean preserveRecordData
) {