This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55764f837b7 KAFKA-19859: Start offset moved incorrectly on receiving
acknowledgements for offset post LSO movement (#20823)
55764f837b7 is described below
commit 55764f837b744dcaf2295f4af5170bf25c2f48e9
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Nov 5 02:17:34 2025 +0530
KAFKA-19859: Start offset moved incorrectly on receiving acknowledgements
for offset post LSO movement (#20823)
### About
This PR fixes the bug when start offset moved incorrectly on receiving
acknowledgements for offset post LSO movement.
### Test
Added some unit tests to cover the erroneous scenario
Reviewers: Apoorv Mittal <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 12 +-
.../kafka/server/share/SharePartitionTest.java | 168 ++++++++++++++++++++-
2 files changed, 175 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 3e311e3fcd9..d77493186d4 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2375,7 +2375,10 @@ public class SharePartition {
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(lastOffsetAcknowledged);
// If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
- startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+ long lastOffsetAcknowledgedHigherKey =
cachedState.higherKey(lastOffsetAcknowledged);
+ if (lastOffsetAcknowledgedHigherKey > startOffset) {
+ startOffset = lastOffsetAcknowledgedHigherKey;
+ }
if (isPersisterReadGapWindowActive()) {
// This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
// Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
@@ -2389,8 +2392,11 @@ public class SharePartition {
lastKeyToRemove = entry.getKey();
} else {
// The code will reach this point only if
lastOffsetAcknowledged is in the middle of some stateBatch. In this case
- // we can simply move the startOffset to the next offset of
lastOffsetAcknowledged and should consider any read gap offsets.
- startOffset = lastOffsetAcknowledged + 1;
+ // we can move the startOffset to the next offset of
lastOffsetAcknowledged only if that offset is
+ // ahead of start offset and should consider any read gap
offsets.
+ if (lastOffsetAcknowledged + 1 > startOffset) {
+ startOffset = lastOffsetAcknowledged + 1;
+ }
if (entry.getKey().equals(cachedState.firstKey())) {
// If the first batch in cachedState has some records yet
to be acknowledged,
// then nothing should be removed from cachedState
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index eb20dc9b8e3..08b5aa10f87 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -10159,8 +10159,7 @@ public class SharePartitionTest {
// Expire timer
mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger
expire
- // todo: index 2 in expectedStates should be RecordState.ARCHIVED -
fix after ticket KAFKA-19859 is addressed.
- List<RecordState> expectedStates = List.of(RecordState.ARCHIVED,
RecordState.ACKNOWLEDGED, RecordState.AVAILABLE, RecordState.ACKNOWLEDGED,
RecordState.AVAILABLE);
+ List<RecordState> expectedStates = List.of(RecordState.ARCHIVED,
RecordState.ACKNOWLEDGED, RecordState.ARCHIVED, RecordState.ACKNOWLEDGED,
RecordState.AVAILABLE);
for (long i = 0; i <= 4; i++) {
InFlightState offset =
sharePartition.cachedState().get(0L).offsetState().get(i);
assertNull(offset.acquisitionLockTimeoutTask());
@@ -10233,6 +10232,171 @@ public class SharePartitionTest {
Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
}
+ @Test
+ public void testLsoMovementWithPendingAcknowledgements() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withPersister(persister)
+ .build();
+
+ List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition,
memoryRecords(0, 5), 5);
+ assertEquals(1, records.size());
+ assertEquals(0, records.get(0).firstOffset());
+ assertEquals(4, records.get(0).lastOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ AcquisitionLockTimerTask taskOrig =
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask();
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ // Acknowledge offsets 1 and 3 out of 0-4 with ACCEPT.
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(1, 1,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(3, 3,
List.of(AcknowledgeType.ACCEPT.id))));
+
+ // Move LSO to 3.
+ sharePartition.updateCacheAndOffsets(3);
+
+ assertEquals(5, sharePartition.nextFetchOffset());
+ assertEquals(3, sharePartition.startOffset());
+ assertEquals(4, sharePartition.endOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+
+ assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
+ assertNotEquals(taskOrig,
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+
+ InFlightState offset0 =
sharePartition.cachedState().get(0L).offsetState().get(0L);
+ InFlightState offset1 =
sharePartition.cachedState().get(0L).offsetState().get(1L);
+ InFlightState offset2 =
sharePartition.cachedState().get(0L).offsetState().get(2L);
+ InFlightState offset3 =
sharePartition.cachedState().get(0L).offsetState().get(3L);
+ InFlightState offset4 =
sharePartition.cachedState().get(0L).offsetState().get(4L);
+
+ assertEquals(RecordState.ACQUIRED, offset0.state());
+ assertNotNull(offset0.acquisitionLockTimeoutTask());
+
+ assertEquals(RecordState.ACKNOWLEDGED, offset1.state());
+ assertNull(offset1.acquisitionLockTimeoutTask());
+
+ assertEquals(RecordState.ACQUIRED, offset2.state());
+ assertNotNull(offset2.acquisitionLockTimeoutTask());
+
+ assertEquals(RecordState.ACKNOWLEDGED, offset3.state());
+ assertNull(offset3.acquisitionLockTimeoutTask());
+
+ assertEquals(RecordState.ACQUIRED, offset4.state());
+ assertNotNull(offset4.acquisitionLockTimeoutTask());
+
+ assertEquals(3, sharePartition.timer().size()); // offsets 0,2 and 4
are still in ACQUIRED state.
+
+ // Expire acquisition lock timeout
+ mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1);
+ List<RecordState> expectedStates = List.of(RecordState.ARCHIVED,
RecordState.ACKNOWLEDGED, RecordState.ARCHIVED, RecordState.ACKNOWLEDGED,
RecordState.AVAILABLE);
+ for (long i = 0; i <= 4; i++) {
+ InFlightState offset =
sharePartition.cachedState().get(0L).offsetState().get(i);
+ assertNull(offset.acquisitionLockTimeoutTask());
+ assertEquals(expectedStates.get((int) i), offset.state());
+ }
+
+ assertEquals(0, sharePartition.timer().size()); // All timer jobs have
completed
+ Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
+ }
+
+ @Test
+ public void testLsoMovementWithPendingAcknowledgementsForBatches() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(0, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(15, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
+ assertEquals(5, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ // Acknowledge batches 5-9 and 15-19 with ACCEPT.
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(15, 19,
List.of(AcknowledgeType.ACCEPT.id))));
+
+ // Move LSO to 12.
+ sharePartition.updateCacheAndOffsets(12);
+
+ assertEquals(25, sharePartition.nextFetchOffset());
+ assertEquals(12, sharePartition.startOffset());
+ assertEquals(24, sharePartition.endOffset());
+ assertEquals(5, sharePartition.cachedState().size());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
+
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(15L).batchAcquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+
+ assertEquals(3, sharePartition.timer().size());
+
+ // Expire acquisition lock timeout.
+ mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1);
+
+ assertEquals(12, sharePartition.nextFetchOffset());
+ assertEquals(12, sharePartition.startOffset());
+ assertEquals(24, sharePartition.endOffset());
+ assertEquals(5, sharePartition.cachedState().size());
+
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).batchState());
+ // Batch 10-14 will now be tracked on a per-offset basis.
+ assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(10L).offsetState().get(10L).state());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(10L).offsetState().get(11L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).offsetState().get(12L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).offsetState().get(13L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).offsetState().get(14L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(20L).batchState());
+
+
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(15L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+
+ assertEquals(0, sharePartition.timer().size()); // All timer jobs have
completed
+ Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/