This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55764f837b7 KAFKA-19859: Start offset moved incorrectly on receiving 
acknowledgements for offset post LSO movement (#20823)
55764f837b7 is described below

commit 55764f837b744dcaf2295f4af5170bf25c2f48e9
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Nov 5 02:17:34 2025 +0530

    KAFKA-19859: Start offset moved incorrectly on receiving acknowledgements 
for offset post LSO movement (#20823)
    
    ### About
    This PR fixes the bug when start offset moved incorrectly on receiving
    acknowledgements for offset post LSO movement.
    
    ### Test
    Added some unit tests to cover the erroneous scenario
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  12 +-
 .../kafka/server/share/SharePartitionTest.java     | 168 ++++++++++++++++++++-
 2 files changed, 175 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 3e311e3fcd9..d77493186d4 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2375,7 +2375,10 @@ public class SharePartition {
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
             // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
-                startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                long lastOffsetAcknowledgedHigherKey = 
cachedState.higherKey(lastOffsetAcknowledged);
+                if (lastOffsetAcknowledgedHigherKey > startOffset) {
+                    startOffset = lastOffsetAcknowledgedHigherKey;
+                }
                 if (isPersisterReadGapWindowActive()) {
                     // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
                     // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
@@ -2389,8 +2392,11 @@ public class SharePartition {
                 lastKeyToRemove = entry.getKey();
             } else {
                 // The code will reach this point only if 
lastOffsetAcknowledged is in the middle of some stateBatch. In this case
-                // we can simply move the startOffset to the next offset of 
lastOffsetAcknowledged and should consider any read gap offsets.
-                startOffset = lastOffsetAcknowledged + 1;
+                // we can move the startOffset to the next offset of 
lastOffsetAcknowledged only if that offset is
+                // ahead of start offset and should consider any read gap 
offsets.
+                if (lastOffsetAcknowledged + 1 > startOffset) {
+                    startOffset = lastOffsetAcknowledged + 1;
+                }
                 if (entry.getKey().equals(cachedState.firstKey())) {
                     // If the first batch in cachedState has some records yet 
to be acknowledged,
                     // then nothing should be removed from cachedState
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index eb20dc9b8e3..08b5aa10f87 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -10159,8 +10159,7 @@ public class SharePartitionTest {
 
         // Expire timer
         mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1);    // Trigger 
expire
-        // todo: index 2 in expectedStates should be RecordState.ARCHIVED - 
fix after ticket KAFKA-19859 is addressed.
-        List<RecordState> expectedStates = List.of(RecordState.ARCHIVED, 
RecordState.ACKNOWLEDGED, RecordState.AVAILABLE, RecordState.ACKNOWLEDGED, 
RecordState.AVAILABLE);
+        List<RecordState> expectedStates = List.of(RecordState.ARCHIVED, 
RecordState.ACKNOWLEDGED, RecordState.ARCHIVED, RecordState.ACKNOWLEDGED, 
RecordState.AVAILABLE);
         for (long i = 0; i <= 4; i++) {
             InFlightState offset = 
sharePartition.cachedState().get(0L).offsetState().get(i);
             assertNull(offset.acquisitionLockTimeoutTask());
@@ -10233,6 +10232,171 @@ public class SharePartitionTest {
         Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testLsoMovementWithPendingAcknowledgements() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withPersister(persister)
+            .build();
+
+        List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, 
memoryRecords(0, 5), 5);
+        assertEquals(1, records.size());
+        assertEquals(0, records.get(0).firstOffset());
+        assertEquals(4, records.get(0).lastOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        AcquisitionLockTimerTask taskOrig = 
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask();
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        // Acknowledge offsets 1 and 3 out of 0-4 with ACCEPT.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(1, 1, 
List.of(AcknowledgeType.ACCEPT.id)),
+            new ShareAcknowledgementBatch(3, 3, 
List.of(AcknowledgeType.ACCEPT.id))));
+
+        // Move LSO to 3.
+        sharePartition.updateCacheAndOffsets(3);
+
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(3, sharePartition.startOffset());
+        assertEquals(4, sharePartition.endOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+
+        assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
+        assertNotEquals(taskOrig, 
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+
+        InFlightState offset0 = 
sharePartition.cachedState().get(0L).offsetState().get(0L);
+        InFlightState offset1 = 
sharePartition.cachedState().get(0L).offsetState().get(1L);
+        InFlightState offset2 = 
sharePartition.cachedState().get(0L).offsetState().get(2L);
+        InFlightState offset3 = 
sharePartition.cachedState().get(0L).offsetState().get(3L);
+        InFlightState offset4 = 
sharePartition.cachedState().get(0L).offsetState().get(4L);
+
+        assertEquals(RecordState.ACQUIRED, offset0.state());
+        assertNotNull(offset0.acquisitionLockTimeoutTask());
+
+        assertEquals(RecordState.ACKNOWLEDGED, offset1.state());
+        assertNull(offset1.acquisitionLockTimeoutTask());
+
+        assertEquals(RecordState.ACQUIRED, offset2.state());
+        assertNotNull(offset2.acquisitionLockTimeoutTask());
+
+        assertEquals(RecordState.ACKNOWLEDGED, offset3.state());
+        assertNull(offset3.acquisitionLockTimeoutTask());
+
+        assertEquals(RecordState.ACQUIRED, offset4.state());
+        assertNotNull(offset4.acquisitionLockTimeoutTask());
+
+        assertEquals(3, sharePartition.timer().size()); // offsets 0,2 and 4 
are still in ACQUIRED state.
+
+        // Expire acquisition lock timeout
+        mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1);
+        List<RecordState> expectedStates = List.of(RecordState.ARCHIVED, 
RecordState.ACKNOWLEDGED, RecordState.ARCHIVED, RecordState.ACKNOWLEDGED, 
RecordState.AVAILABLE);
+        for (long i = 0; i <= 4; i++) {
+            InFlightState offset = 
sharePartition.cachedState().get(0L).offsetState().get(i);
+            assertNull(offset.acquisitionLockTimeoutTask());
+            assertEquals(expectedStates.get((int) i), offset.state());
+        }
+
+        assertEquals(0, sharePartition.timer().size()); // All timer jobs have 
completed
+        Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
+    }
+
+    @Test
+    public void testLsoMovementWithPendingAcknowledgementsForBatches() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withPersister(persister)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(0, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
+        assertEquals(5, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        // Acknowledge batches 5-9 and 15-19 with ACCEPT.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(5, 9, 
List.of(AcknowledgeType.ACCEPT.id)),
+            new ShareAcknowledgementBatch(15, 19, 
List.of(AcknowledgeType.ACCEPT.id))));
+
+        // Move LSO to 12.
+        sharePartition.updateCacheAndOffsets(12);
+
+        assertEquals(25, sharePartition.nextFetchOffset());
+        assertEquals(12, sharePartition.startOffset());
+        assertEquals(24, sharePartition.endOffset());
+        assertEquals(5, sharePartition.cachedState().size());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+        
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(15L).batchAcquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+
+        assertEquals(3, sharePartition.timer().size());
+
+        // Expire acquisition lock timeout.
+        mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1);
+
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(12, sharePartition.startOffset());
+        assertEquals(24, sharePartition.endOffset());
+        assertEquals(5, sharePartition.cachedState().size());
+
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).batchState());
+        // Batch 10-14 will now be tracked on a per-offset basis.
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(10L).offsetState().get(10L).state());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(10L).offsetState().get(11L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).offsetState().get(12L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).offsetState().get(13L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).offsetState().get(14L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(20L).batchState());
+
+        
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(15L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+
+        assertEquals(0, sharePartition.timer().size()); // All timer jobs have 
completed
+        Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */

Reply via email to