This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6c0778aeaec KAFKA-20245: DLQ share group client REJECT ack records. 
[2/N] (#21793)
6c0778aeaec is described below

commit 6c0778aeaec926cf7287842d9b4c91660bc47d6c
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Apr 11 02:23:26 2026 +0530

    KAFKA-20245: DLQ share group client REJECT ack records. [2/N] (#21793)
    
    * PR adds code in `SharePartition` to support DLQ of records (batch and
    per offset) which have been REJECT acknowledged by the client.
    * Appropriate unit tests have been added where applicable.
    * The dynamic `ShareVersion` feature has been used to detect DLQ
    support.
    * Current impl hardcodes `NoOpShareGroupDLQManager`. This will be
    updated to specific implementation in future PRs.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 134 +++++++++++++++++--
 .../kafka/server/share/SharePartitionManager.java  |  26 +++-
 .../src/main/scala/kafka/server/BrokerServer.scala |   5 +-
 .../server/share/SharePartitionManagerTest.java    |  10 +-
 .../kafka/server/share/SharePartitionTest.java     | 143 ++++++++++++++++++++-
 .../server/share/dlq/NoOpShareGroupDLQManager.java |   6 +
 .../kafka/server/share/dlq/ShareGroupDLQ.java      |   4 -
 .../share/dlq/ShareGroupDLQRecordParameter.java    |   2 +-
 8 files changed, 304 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 28afd716960..6863cc4d91d 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -42,6 +42,9 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -89,6 +92,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
 import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
@@ -325,6 +329,16 @@ public class SharePartition {
      */
     private long fetchLockIdleDurationMs;
 
+    /**
+     * Reference to the dlq manager implementation.
+     */
+    private final ShareGroupDLQ shareGroupDLQ = new NoOpShareGroupDLQManager();
+
+    /**
+     * Supplier to toggle dlq support.
+     */
+    private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+
     SharePartition(
         String groupId,
         TopicIdPartition topicIdPartition,
@@ -337,11 +351,12 @@ public class SharePartition {
         Persister persister,
         ReplicaManager replicaManager,
         ShareGroupConfigProvider configProvider,
-        SharePartitionListener listener
+        SharePartitionListener listener,
+        Supplier<Boolean> shareGroupDlqEnableSupplier
     ) {
         this(groupId, topicIdPartition, leaderEpoch, 
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
             timer, time, persister, replicaManager, configProvider, 
SharePartitionState.EMPTY, listener,
-            new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()));
+            new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()), shareGroupDlqEnableSupplier);
     }
 
     // Visible for testing
@@ -360,7 +375,8 @@ public class SharePartition {
         ShareGroupConfigProvider configProvider,
         SharePartitionState sharePartitionState,
         SharePartitionListener listener,
-        SharePartitionMetrics sharePartitionMetrics
+        SharePartitionMetrics sharePartitionMetrics,
+        Supplier<Boolean> shareGroupDlqEnableSupplier
     ) {
         this.groupId = groupId;
         this.topicIdPartition = topicIdPartition;
@@ -386,6 +402,7 @@ public class SharePartition {
         this.timeoutHandler = releaseAcquisitionLockOnTimeout();
         this.registerGaugeMetrics();
         this.deliveryCompleteCount = new AtomicInteger(0);
+        this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
     }
 
     /**
@@ -1123,7 +1140,7 @@ public class SharePartition {
 
                 // Successfully updated the state of the offset and created a 
persister state batch for write to persister.
                 persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(offsetState.getKey(),
-                    offsetState.getKey(), updateResult.state().id(), (short) 
updateResult.deliveryCount())));
+                    offsetState.getKey(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), null));
                 if (offsetState.getKey() >= startOffset && 
isStateTerminal(updateResult.state())) {
                     deliveryCompleteCount.incrementAndGet();
                 }
@@ -1164,7 +1181,7 @@ public class SharePartition {
 
             // Successfully updated the state of the batch and created a 
persister state batch for write to persister.
             persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(inFlightBatch.firstOffset(),
-                inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount())));
+                inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), null));
             if (isStateTerminal(updateResult.state())) {
                 
deliveryCompleteCount.addAndGet(numInFlightRecordsInBatch(inFlightBatch.firstOffset(),
 inFlightBatch.lastOffset()));
             }
@@ -2317,7 +2334,8 @@ public class SharePartition {
                     // by the client, then use the batch record state. This 
will always be present as it is a static
                     // mapping between bytes and record state type. All ack 
types have been added except for RENEW which
                     // has been handled above.
-                    RecordState recordState = 
ACK_TYPE_TO_RECORD_STATE.get(ackType);
+                    RecordState recordState = recordStateWithDlq(ackType);
+                    Throwable dlqCause = recordState == RecordState.ARCHIVING 
? ShareGroupDLQ.CLIENT_REJECT : null;
                     if (recordState == null) {
                         return Optional.of(new 
IllegalArgumentException("Unknown acknowledge type id: " + ackType));
                     }
@@ -2336,9 +2354,10 @@ public class SharePartition {
                         return Optional.of(new InvalidRecordStateException(
                             "Unable to acknowledge records for the batch"));
                     }
+
                     // Successfully updated the state of the offset and 
created a persister state batch for write to persister.
                     persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(offsetState.getKey(),
-                        offsetState.getKey(), updateResult.state().id(), 
(short) updateResult.deliveryCount())));
+                        offsetState.getKey(), updateResult.state().id(), 
(short) updateResult.deliveryCount()), dlqCause));
                     if (isStateTerminal(updateResult.state())) {
                         deliveryCompleteCount.incrementAndGet();
                     }
@@ -2396,7 +2415,8 @@ public class SharePartition {
             // The member id is reset to EMPTY_MEMBER_ID irrespective of the 
ack type as the batch is
             // either released or moved to a state where member id existence 
is not important. The member id
             // is only important when the batch is acquired.
-            RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
+            RecordState recordState = recordStateWithDlq(ackType);
+            Throwable dlqCause = recordState == RecordState.ARCHIVING ? 
ShareGroupDLQ.CLIENT_REJECT : null;
             if (recordState == null) {
                 return Optional.of(new IllegalArgumentException("Unknown 
acknowledge type id: " + ackType));
             }
@@ -2417,7 +2437,7 @@ public class SharePartition {
 
             // Successfully updated the state of the batch and created a 
persister state batch for write to persister.
             persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(inFlightBatch.firstOffset(),
-                inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount())));
+                inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), dlqCause));
             if (isStateTerminal(updateResult.state())) {
                 deliveryCompleteCount.addAndGet((int) 
(inFlightBatch.lastOffset() - inFlightBatch.firstOffset() + 1));
             }
@@ -2527,12 +2547,96 @@ public class SharePartition {
 
                     log.trace("State change request successful for share 
partition: {}-{}",
                         groupId, topicIdPartition);
-                    persisterBatches.forEach(persisterBatch -> {
+
+                    List<PersisterBatch> nonDlqBatches = new 
ArrayList<>(persisterBatches.size());
+                    List<PersisterBatch> dlqBatches = new 
ArrayList<>(persisterBatches.size());
+                    for (PersisterBatch persisterBatch : persisterBatches) {
+                        if (persisterBatch.updatedState.state() == 
RecordState.ARCHIVING) {
+                            dlqBatches.add(persisterBatch);
+                        } else {
+                            nonDlqBatches.add(persisterBatch);
+                        }
+                    }
+
+                    nonDlqBatches.forEach(persisterBatch -> {
                         
persisterBatch.updatedState.completeStateTransition(true);
                         if (persisterBatch.updatedState.state() == 
RecordState.AVAILABLE) {
                             updateFindNextFetchOffset(true);
                         }
                     });
+
+                    dlqBatches.forEach(persisterBatch -> {
+                        
persisterBatch.updatedState.completeStateTransition(true);
+                        shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
+                            groupId,
+                            topicIdPartition,
+                            persisterBatch.stateBatch.firstOffset(),
+                            persisterBatch.stateBatch.lastOffset(),
+                            
Optional.of(persisterBatch.stateBatch.deliveryCount()),
+                            Optional.ofNullable(persisterBatch.dlqCause),
+                            false
+                        )).whenComplete((v1, dlqException) -> {
+                            PersisterStateBatch sb = persisterBatch.stateBatch;
+                            if (dlqException != null) {
+                                log.error("Failed to write to DLQ for share 
partition: {}-{}, offsets {}-{}. "
+                                        + "Proceeding to ARCHIVED state 
regardless.",
+                                    groupId, topicIdPartition, 
sb.firstOffset(), sb.lastOffset(), dlqException);
+                            }
+
+                            PersisterBatch phase2Batch;
+                            lock.writeLock().lock();
+                            try {
+                                // dlqBatch.updatedState() is the same 
InFlightState object in the cache,
+                                // now committed in ARCHIVING. Transition it 
directly.
+                                InFlightState updateResult = 
persisterBatch.updatedState().startStateTransition(
+                                    RecordState.ARCHIVED,
+                                    DeliveryCountOps.NO_OP,
+                                    maxDeliveryCount(),
+                                    EMPTY_MEMBER_ID
+                                );
+                                if (updateResult == null) {
+                                    log.error("Unable to transition ARCHIVING 
→ ARCHIVED for offsets {}-{} "
+                                            + "in share partition: {}-{}", 
sb.firstOffset(), sb.lastOffset(),
+                                        groupId, topicIdPartition);
+                                    return;
+                                }
+                                phase2Batch = new PersisterBatch(updateResult, 
new PersisterStateBatch(
+                                    sb.firstOffset(), sb.lastOffset(),
+                                    updateResult.state().id(), (short) 
updateResult.deliveryCount()), null);
+                                deliveryCompleteCount.addAndGet(
+                                    
numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
+                            } finally {
+                                lock.writeLock().unlock();
+                            }
+
+                            // Second persist: ARCHIVING → ARCHIVED
+                            
writeShareGroupState(List.of(phase2Batch.stateBatch()))
+                                .whenComplete((v2, phase2Exception) -> {
+                                    boolean phase2CacheUpdated = false;
+                                    lock.writeLock().lock();
+                                    try {
+                                        if (phase2Exception != null) {
+                                            log.error("Failed to persist 
ARCHIVED state for DLQ phase 2, "
+                                                    + "share partition: {}-{}. 
Records remain in ARCHIVING.",
+                                                groupId, topicIdPartition, 
phase2Exception);
+                                            
phase2Batch.updatedState().completeStateTransition(false);
+                                            if 
(isStateTerminal(RecordState.forId(phase2Batch.stateBatch().deliveryState()))
+                                                && 
!isStateTerminal(phase2Batch.updatedState().state())) {
+                                                
deliveryCompleteCount.addAndGet(
+                                                    
-numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
+                                            }
+                                            return;
+                                        }
+
+                                        
phase2Batch.updatedState().completeStateTransition(true);
+                                        phase2CacheUpdated = 
maybeUpdateCachedStateAndOffsets();
+                                    } finally {
+                                        lock.writeLock().unlock();
+                                        
maybeCompleteDelayedShareFetchRequest(phase2CacheUpdated);
+                                    }
+                                });
+                        });
+                    });
                     // Update the cached state and start and end offsets after 
acknowledging/releasing the acquired records.
                     cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
                     future.complete(null);
@@ -3225,6 +3329,13 @@ public class SharePartition {
         return batch.isTransactional() && 
abortedProducerIds.contains(batch.producerId());
     }
 
+    private RecordState recordStateWithDlq(byte ackType) {
+        if (shareGroupDlqEnableSupplier.get() && AcknowledgeType.REJECT.id == 
ackType) {
+            return RecordState.ARCHIVING;
+        }
+        return ACK_TYPE_TO_RECORD_STATE.get(ackType);
+    }
+
     // Visible for testing.
     boolean containsAbortMarker(RecordBatch batch) {
         if (!batch.isControlBatch())
@@ -3397,7 +3508,8 @@ public class SharePartition {
      */
     private record PersisterBatch(
         InFlightState updatedState,
-        PersisterStateBatch stateBatch
+        PersisterStateBatch stateBatch,
+        Throwable dlqCause
     ) { }
 
     /**
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index a6311c44bd8..3b4db2162ee 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -74,6 +74,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 /**
  * The SharePartitionManager is responsible for managing the SharePartitions 
and ShareSessions.
@@ -148,6 +149,11 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final BrokerTopicStats brokerTopicStats;
 
+    /**
+     * Supplier indicating whether DLQ support is enabled.
+     */
+    private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+
     public SharePartitionManager(
         ReplicaManager replicaManager,
         Time time,
@@ -158,7 +164,8 @@ public class SharePartitionManager implements AutoCloseable 
{
         long remoteFetchMaxWaitMs,
         Persister persister,
         ShareGroupConfigProvider configProvider,
-        BrokerTopicStats brokerTopicStats
+        BrokerTopicStats brokerTopicStats,
+        Supplier<Boolean> shareGroupDlqEnableSupplier
     ) {
         this(replicaManager,
             time,
@@ -171,7 +178,8 @@ public class SharePartitionManager implements AutoCloseable 
{
             persister,
             configProvider,
             new ShareGroupMetrics(time),
-            brokerTopicStats
+            brokerTopicStats,
+            shareGroupDlqEnableSupplier
         );
     }
 
@@ -187,7 +195,8 @@ public class SharePartitionManager implements AutoCloseable 
{
         Persister persister,
         ShareGroupConfigProvider configProvider,
         ShareGroupMetrics shareGroupMetrics,
-        BrokerTopicStats brokerTopicStats
+        BrokerTopicStats brokerTopicStats,
+        Supplier<Boolean> shareGroupDlqEnableSupplier
     ) {
         this(replicaManager,
             time,
@@ -202,11 +211,13 @@ public class SharePartitionManager implements 
AutoCloseable {
             persister,
             configProvider,
             shareGroupMetrics,
-            brokerTopicStats
+            brokerTopicStats,
+            shareGroupDlqEnableSupplier
         );
     }
 
     // Visible for testing.
+    @SuppressWarnings("ParameterNumber")
     SharePartitionManager(
             ReplicaManager replicaManager,
             Time time,
@@ -220,7 +231,8 @@ public class SharePartitionManager implements AutoCloseable 
{
             Persister persister,
             ShareGroupConfigProvider configProvider,
             ShareGroupMetrics shareGroupMetrics,
-            BrokerTopicStats brokerTopicStats
+            BrokerTopicStats brokerTopicStats,
+            Supplier<Boolean> shareGroupDlqEnableSupplier
     ) {
         this.replicaManager = replicaManager;
         this.time = time;
@@ -236,6 +248,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         this.shareGroupMetrics = shareGroupMetrics;
         this.brokerTopicStats = brokerTopicStats;
         this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
+        this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
     }
 
     /**
@@ -719,7 +732,8 @@ public class SharePartitionManager implements AutoCloseable 
{
                             persister,
                             replicaManager,
                             configProvider,
-                            listener
+                            listener,
+                            shareGroupDlqEnableSupplier
                     );
                 });
     }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index fe0fd1d2ec5..5f190a18869 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublish
 import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.FetchSession.FetchSessionCache
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, ShareVersion, 
TopicIdPartition}
 import org.apache.kafka.server.config.{ConfigType, 
DelegationTokenManagerConfigs}
 import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
 import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, 
RemoteLogManagerConfig}
@@ -459,7 +459,8 @@ class BrokerServer(
         config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
         persister,
         new ShareGroupConfigProvider(groupConfigManager),
-        brokerTopicStats
+        brokerTopicStats,
+        () => 
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)).supportsShareGroupDLQ()
       )
 
       dataPlaneRequestProcessor = new KafkaApis(
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index c6c96f79230..80f941d488e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -112,6 +112,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import scala.Tuple2;
 import scala.collection.Seq;
@@ -3246,6 +3247,7 @@ public class SharePartitionManagerTest {
         private Timer timer = new MockTimer();
         private ShareGroupMetrics shareGroupMetrics = new 
ShareGroupMetrics(time);
         private BrokerTopicStats brokerTopicStats;
+        private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
 
         private SharePartitionManagerBuilder withReplicaManager(ReplicaManager 
replicaManager) {
             this.replicaManager = replicaManager;
@@ -3282,6 +3284,11 @@ public class SharePartitionManagerTest {
             return this;
         }
 
+        private SharePartitionManagerBuilder 
withShareGroupDlqEnableSupplier(Supplier<Boolean> shareGroupDlqEnableSupplier) {
+            this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+            return this;
+        }
+
         public static SharePartitionManagerBuilder builder() {
             return new SharePartitionManagerBuilder();
         }
@@ -3299,7 +3306,8 @@ public class SharePartitionManagerTest {
                 persister,
                 new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
                 shareGroupMetrics,
-                brokerTopicStats
+                brokerTopicStats,
+                shareGroupDlqEnableSupplier
             );
         }
     }
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 24d37dc0700..c850f9e85a6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -96,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -12602,6 +12603,140 @@ public class SharePartitionTest {
         assertFalse(sharePartition.cachedState().isEmpty());
     }
 
+    @Test
+    public void testAcknowledgeRejectWithDlqEnabled() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire 2 batches so that the first one stays in cache after being 
archived.
+        MemoryRecords records1 = memoryRecords(5, 5);
+        MemoryRecords records2 = memoryRecords(10, 5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 5);
+        assertEquals(1, acquiredRecordsList.size());
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 
5);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Acknowledge the first batch with REJECT.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 9, 
List.of(AcknowledgeType.REJECT.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        // With NoOp DLQ manager and NoOp persister, the full 2-phase flow 
completes synchronously:
+        // ACQUIRED -> ARCHIVING (phase 1 persist) -> DLQ enqueue -> ARCHIVED 
(phase 2 persist).
+        // The final state should be ARCHIVED.
+        assertNull(sharePartition.cachedState().get(5L));
+        assertEquals(1, sharePartition.cachedState().size());
+
+        // The second batch should remain acquired.
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+
+        // Start offset advances past the archived batch.
+        assertEquals(10, sharePartition.startOffset());
+
+        // deliveryCompleteCount is 0 as evicted records are subtracted.
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+    }
+
+    @Test
+    public void testAcknowledgeRejectWithDlqDisabled() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> false)
+            .build();
+
+        MemoryRecords records1 = memoryRecords(5, 5);
+        MemoryRecords records2 = memoryRecords(10, 5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 5);
+        assertEquals(1, acquiredRecordsList.size());
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 
5);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Acknowledge the first batch with REJECT when DLQ is disabled.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 9, 
List.of(AcknowledgeType.REJECT.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        // Without DLQ, REJECT goes directly to ARCHIVED (no ARCHIVING 
intermediate state).
+        // Once ARCHIVED, the batch at start offset is evicted from cache.
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNull(sharePartition.cachedState().get(5L));
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+    }
+
+    @Test
+    public void testAcknowledgePerOffsetRejectWithDlqEnabled() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire a batch with 5 records (offsets 0-4) and a second batch to 
keep cache populated.
+        MemoryRecords records1 = memoryRecords(5);
+        MemoryRecords records2 = memoryRecords(5, 5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 5);
+        assertEquals(1, acquiredRecordsList.size());
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 
5);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Acknowledge with per-offset ack types: ACCEPT for 0-2, REJECT for 
3-4.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(0, 4, List.of(
+                AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, 
AcknowledgeType.ACCEPT.id,
+                AcknowledgeType.REJECT.id, AcknowledgeType.REJECT.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        // With NoOp DLQ + NoOp persister, full 2-phase flow completes 
synchronously.
+        // Batch should have offset state since offsets have different states.
+        // Once all offsets reach terminal state, the batch at start offset is 
evicted.
+        assertNull(sharePartition.cachedState().get(0L));
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+    }
+
+    @Test
+    public void testAcknowledgePerOffsetRejectWithDlqDisabled() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> false)
+            .build();
+
+        MemoryRecords records1 = memoryRecords(5);
+        MemoryRecords records2 = memoryRecords(5, 5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 5);
+        assertEquals(1, acquiredRecordsList.size());
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 
5);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Acknowledge with per-offset ack types: ACCEPT for 0-2, REJECT for 
3-4.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(0, 4, List.of(
+                AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, 
AcknowledgeType.ACCEPT.id,
+                AcknowledgeType.REJECT.id, AcknowledgeType.REJECT.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        // Without DLQ, REJECT goes directly to ARCHIVED (no ARCHIVING 
intermediate state).
+        // All offsets in first batch reach terminal state, so it is evicted.
+        assertNull(sharePartition.cachedState().get(0L));
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+    }
+
     private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
         ShareGroupConfigProvider configProvider = 
Mockito.mock(ShareGroupConfigProvider.class);
         
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
@@ -12623,6 +12758,7 @@ public class SharePartitionTest {
         private SharePartitionState state = SharePartitionState.EMPTY;
         private Time time = MOCK_TIME;
         private SharePartitionMetrics sharePartitionMetrics = 
Mockito.mock(SharePartitionMetrics.class);
+        private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
 
         private SharePartitionBuilder withMaxInflightRecords(int 
defaultMaxInflightRecords) {
             this.defaultMaxInflightRecords = defaultMaxInflightRecords;
@@ -12669,6 +12805,11 @@ public class SharePartitionTest {
             return this;
         }
 
+        private SharePartitionBuilder 
withShareGroupDlqEnableSupplier(Supplier<Boolean> shareGroupDlqEnableSupplier) {
+            this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+            return this;
+        }
+
         public static SharePartitionBuilder builder() {
             return new SharePartitionBuilder();
         }
@@ -12676,7 +12817,7 @@ public class SharePartitionTest {
         public SharePartition build() {
             return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, 
defaultMaxInflightRecords, defaultMaxDeliveryCount,
                     defaultAcquisitionLockTimeoutMs, mockTimer, time, 
persister, replicaManager, configProvider,
-                    state, Mockito.mock(SharePartitionListener.class), 
sharePartitionMetrics);
+                    state, Mockito.mock(SharePartitionListener.class), 
sharePartitionMetrics, shareGroupDlqEnableSupplier);
         }
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
index e7cb9efc615..cd7b971b47b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.server.share.dlq;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -25,8 +28,11 @@ import java.util.concurrent.CompletableFuture;
  * a successfully completed future.
  */
 public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
+    private static final Logger log = 
LoggerFactory.getLogger(NoOpShareGroupDLQManager.class);
+
     @Override
     public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) 
{
+        log.trace("Enqueuing share group dlq record parameter: {}", param);
         return CompletableFuture.completedFuture(null);
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
index 8b56cc64073..96bf848076c 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
@@ -30,12 +30,8 @@ public interface ShareGroupDLQ {
         }
     }
 
-    Throwable STALE_BATCH = new ShareGroupDLQThrowable("Offset part of stale 
batch.");
-    Throwable BEHIND_LSO = new ShareGroupDLQThrowable("Offset before LSO.");
-    Throwable ABORTED_TRANSACTION = new ShareGroupDLQThrowable("Offset part of 
aborted transaction.");
     Throwable CLIENT_REJECT = new ShareGroupDLQThrowable("Offset rejected by 
client.");
     Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset 
delivery count exceeded the threshold.");
-    Throwable ACQUISITION_LOCK_TIMEOUT = new 
ShareGroupDLQThrowable("Acquisition lock timed out.");
 
     /**
      * Main method exposed to the world to enqueuing a record to the share 
groups dead letter queue.
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
index ba15f41b13f..465279d1229 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -38,7 +38,7 @@ public record ShareGroupDLQRecordParameter(
     TopicIdPartition topicIdPartition,
     long firstOffset,
     long lastOffset,
-    Optional<Integer> deliveryCount,
+    Optional<Short> deliveryCount,
     Optional<Throwable> cause,
     boolean preserveRecordData
 ) {


Reply via email to