This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc96e294995 KAFKA-19476: Correcting max delivery on write state 
failure and lock timeout (#20310)
dc96e294995 is described below

commit dc96e2949958aa6020c594681d06509a0fc90f54
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Aug 7 19:22:00 2025 +0100

    KAFKA-19476: Correcting max delivery on write state failure and lock 
timeout (#20310)
    
    Fixing max delivery check on acquisition lock timeout and write state
    RPC failure.
    
    When acquisition lock is already timed out and write state RPC failure
    occurs then we need to check if records need to be archived. However
    with the fix we do not persist the information, which is relevant as
    some records may be  archived or delivery count is bumped. The
    information will be persisted eventually.
    
    The persister call has failed already hence issuing another persister
    call due to a failed persister call earlier is not correct. Rather let
    the data persist in future persister calls.
    
    Reviewers: Manikumar Reddy <[email protected]>, Abhinav Dixit
     <[email protected]>
---
 .../kafka/server/share/SharePartitionTest.java     | 99 ++++++++++++++++++++++
 .../kafka/server/share/fetch/InFlightState.java    | 27 ++++--
 2 files changed, 120 insertions(+), 6 deletions(-)

diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 1289d720054..30b49e17b16 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -7878,6 +7878,105 @@ public class SharePartitionTest {
         
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
     }
 
+    @Test
+    public void testRecordArchivedWithWriteStateRPCFailure() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withPersister(persister)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+        // Futures which will be completed later, so the batch state has 
ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        // Acknowledge batches.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
+
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+        assertEquals(1, 
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+        future1.complete(writeShareGroupStateResult);
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+        assertEquals(1, 
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+        future2.complete(writeShareGroupStateResult);
+        assertEquals(12L, sharePartition.nextFetchOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+        assertEquals(1, 
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+        // Allowing acquisition lock to expire. This will also ensure that 
acquisition lock timeout task
+        // is run successfully post write state RPC failure.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> 
sharePartition.cachedState().get(2L).offsetState().get(3L).state() == 
RecordState.AVAILABLE  &&
+                sharePartition.cachedState().get(7L).batchState() == 
RecordState.AVAILABLE &&
+                
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount() == 1 
&&
+                sharePartition.cachedState().get(7L).batchDeliveryCount() == 1 
&&
+                sharePartition.timer().size() == 0,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(2L, 
List.of(3L), 7L, List.of())));
+        // Acquisition lock timeout task has run already and next fetch offset 
is moved to 2.
+        assertEquals(2, sharePartition.nextFetchOffset());
+        // Send the same batches again.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+        future1 = new CompletableFuture<>();
+        future2 = new CompletableFuture<>();
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
+
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        // Verify the timer tasks have run and the state is archived for the 
offsets which are not acknowledged,
+        // but the acquisition lock timeout task should be just expired for 
acknowledged offsets, though
+        // the state should not be archived.
+        TestUtils.waitForCondition(
+            () -> 
sharePartition.cachedState().get(2L).offsetState().get(2L).state() == 
RecordState.ARCHIVED  &&
+                
sharePartition.cachedState().get(2L).offsetState().get(3L).state() == 
RecordState.ACKNOWLEDGED  &&
+                
sharePartition.cachedState().get(2L).offsetState().get(3L).acquisitionLockTimeoutTask().hasExpired()
 &&
+                sharePartition.cachedState().get(7L).batchState() == 
RecordState.ACKNOWLEDGED &&
+                
sharePartition.cachedState().get(7L).batchAcquisitionLockTimeoutTask().hasExpired(),
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(2L, 
List.of(3L), 7L, List.of())));
+
+        future1.complete(writeShareGroupStateResult);
+        // Now the state should be archived for the offsets despite the write 
state RPC failure, as the
+        // delivery count has reached the max delivery count and the 
acquisition lock timeout task
+        // has already expired for the offsets which were acknowledged.
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+        assertEquals(2, 
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+        future2.complete(writeShareGroupStateResult);
+        assertEquals(12L, sharePartition.nextFetchOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
index d5831d74853..1dcdb52c90a 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -48,7 +48,7 @@ public class InFlightState {
     private String memberId;
     // The state of the records before the transition. In case we need to 
revert an in-flight state, we revert the above
     // attributes of InFlightState to this state, namely - state, 
deliveryCount and memberId.
-    private InFlightState rollbackState;
+    private RollbackState rollbackState;
     // The timer task for the acquisition lock timeout.
     private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
     // The boolean determines if the record has achieved a terminal state of 
ARCHIVED from which it cannot transition
@@ -205,7 +205,7 @@ public class InFlightState {
         InFlightState currentState = new InFlightState(state, deliveryCount, 
memberId, acquisitionLockTimeoutTask);
         InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
         if (updatedState != null) {
-            rollbackState = currentState;
+            rollbackState = new RollbackState(currentState, maxDeliveryCount);
         }
         return updatedState;
     }
@@ -224,16 +224,23 @@ public class InFlightState {
             rollbackState = null;
             return;
         }
+        InFlightState previousState = rollbackState.state();
         // Check is acquisition lock timeout task is expired then mark the 
message as Available.
         if (acquisitionLockTimeoutTask != null && 
acquisitionLockTimeoutTask.hasExpired()) {
-            state = RecordState.AVAILABLE;
+            // If the acquisition lock timeout task has expired, we should 
mark the record as available.
+            // However, if the delivery count has reached the maximum delivery 
count, we should archive the record.
+            state = previousState.deliveryCount() >= 
rollbackState.maxDeliveryCount ?
+                RecordState.ARCHIVED : RecordState.AVAILABLE;
             memberId = EMPTY_MEMBER_ID;
             cancelAndClearAcquisitionLockTimeoutTask();
         } else {
-            state = rollbackState.state;
-            memberId = rollbackState.memberId;
+            state = previousState.state();
+            memberId = previousState.memberId();
         }
-        deliveryCount = rollbackState.deliveryCount();
+        // Do not revert the delivery count as the delivery count should not 
be reverted for the failed
+        // state transition. However, in the current implementation, the 
delivery count is only incremented
+        // when the state is updated to Acquired, hence reverting the delivery 
count is not needed when
+        // the state transition fails.
         rollbackState = null;
     }
 
@@ -271,4 +278,12 @@ public class InFlightState {
             ", memberId=" + memberId +
             ")";
     }
+
+  /**
+   * This record is used to store the state before the transition. It is used 
to revert the state if the transition fails.
+   * @param state The state of the records before the transition.
+   * @param maxDeliveryCount The maximum delivery count for the record.
+   */
+    private record RollbackState(InFlightState state, int maxDeliveryCount) {
+    }
 }

Reply via email to