This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc96e294995 KAFKA-19476: Correcting max delivery on write state
failure and lock timeout (#20310)
dc96e294995 is described below
commit dc96e2949958aa6020c594681d06509a0fc90f54
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Aug 7 19:22:00 2025 +0100
KAFKA-19476: Correcting max delivery on write state failure and lock
timeout (#20310)
Fixing max delivery check on acquisition lock timeout and write state
RPC failure.
When acquisition lock is already timed out and write state RPC failure
occurs then we need to check if records need to be archived. However
with the fix we do not persist the information, which is relevant as
some records may be archived or delivery count is bumped. The
information will be persisted eventually.
The persister call has failed already hence issuing another persister
call due to a failed persister call earlier is not correct. Rather let
the data persist in future persister calls.
Reviewers: Manikumar Reddy <[email protected]>, Abhinav Dixit
<[email protected]>
---
.../kafka/server/share/SharePartitionTest.java | 99 ++++++++++++++++++++++
.../kafka/server/share/fetch/InFlightState.java | 27 ++++--
2 files changed, 120 insertions(+), 6 deletions(-)
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 1289d720054..30b49e17b16 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -7878,6 +7878,105 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
}
+ @Test
+ public void testRecordArchivedWithWriteStateRPCFailure() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+ // Futures which will be completed later, so the batch state has
ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Acknowledge batches.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
+
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+ assertEquals(1,
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(1,
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+ future1.complete(writeShareGroupStateResult);
+ assertEquals(12, sharePartition.nextFetchOffset());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+ assertEquals(1,
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(1,
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+ future2.complete(writeShareGroupStateResult);
+ assertEquals(12L, sharePartition.nextFetchOffset());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+ assertEquals(1,
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(1,
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+ // Allowing acquisition lock to expire. This will also ensure that
acquisition lock timeout task
+ // is run successfully post write state RPC failure.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () ->
sharePartition.cachedState().get(2L).offsetState().get(3L).state() ==
RecordState.AVAILABLE &&
+ sharePartition.cachedState().get(7L).batchState() ==
RecordState.AVAILABLE &&
+
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount() == 1
&&
+ sharePartition.cachedState().get(7L).batchDeliveryCount() == 1
&&
+ sharePartition.timer().size() == 0,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(2L,
List.of(3L), 7L, List.of())));
+ // Acquisition lock timeout task has run already and next fetch offset
is moved to 2.
+ assertEquals(2, sharePartition.nextFetchOffset());
+ // Send the same batches again.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+ future1 = new CompletableFuture<>();
+ future2 = new CompletableFuture<>();
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
+
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ // Verify the timer tasks have run and the state is archived for the
offsets which are not acknowledged,
+ // but the acquisition lock timeout task should be just expired for
acknowledged offsets, though
+ // the state should not be archived.
+ TestUtils.waitForCondition(
+ () ->
sharePartition.cachedState().get(2L).offsetState().get(2L).state() ==
RecordState.ARCHIVED &&
+
sharePartition.cachedState().get(2L).offsetState().get(3L).state() ==
RecordState.ACKNOWLEDGED &&
+
sharePartition.cachedState().get(2L).offsetState().get(3L).acquisitionLockTimeoutTask().hasExpired()
&&
+ sharePartition.cachedState().get(7L).batchState() ==
RecordState.ACKNOWLEDGED &&
+
sharePartition.cachedState().get(7L).batchAcquisitionLockTimeoutTask().hasExpired(),
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(2L,
List.of(3L), 7L, List.of())));
+
+ future1.complete(writeShareGroupStateResult);
+ // Now the state should be archived for the offsets despite the write
state RPC failure, as the
+ // delivery count has reached the max delivery count and the
acquisition lock timeout task
+ // has already expired for the offsets which were acknowledged.
+ assertEquals(12, sharePartition.nextFetchOffset());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+ assertEquals(2,
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(2,
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+ future2.complete(writeShareGroupStateResult);
+ assertEquals(12L, sharePartition.nextFetchOffset());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(2,
sharePartition.cachedState().get(7L).batchDeliveryCount());
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
index d5831d74853..1dcdb52c90a 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -48,7 +48,7 @@ public class InFlightState {
private String memberId;
// The state of the records before the transition. In case we need to
revert an in-flight state, we revert the above
// attributes of InFlightState to this state, namely - state,
deliveryCount and memberId.
- private InFlightState rollbackState;
+ private RollbackState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
// The boolean determines if the record has achieved a terminal state of
ARCHIVED from which it cannot transition
@@ -205,7 +205,7 @@ public class InFlightState {
InFlightState currentState = new InFlightState(state, deliveryCount,
memberId, acquisitionLockTimeoutTask);
InFlightState updatedState = tryUpdateState(newState, ops,
maxDeliveryCount, newMemberId);
if (updatedState != null) {
- rollbackState = currentState;
+ rollbackState = new RollbackState(currentState, maxDeliveryCount);
}
return updatedState;
}
@@ -224,16 +224,23 @@ public class InFlightState {
rollbackState = null;
return;
}
+ InFlightState previousState = rollbackState.state();
// Check is acquisition lock timeout task is expired then mark the
message as Available.
if (acquisitionLockTimeoutTask != null &&
acquisitionLockTimeoutTask.hasExpired()) {
- state = RecordState.AVAILABLE;
+ // If the acquisition lock timeout task has expired, we should
mark the record as available.
+ // However, if the delivery count has reached the maximum delivery
count, we should archive the record.
+ state = previousState.deliveryCount() >=
rollbackState.maxDeliveryCount ?
+ RecordState.ARCHIVED : RecordState.AVAILABLE;
memberId = EMPTY_MEMBER_ID;
cancelAndClearAcquisitionLockTimeoutTask();
} else {
- state = rollbackState.state;
- memberId = rollbackState.memberId;
+ state = previousState.state();
+ memberId = previousState.memberId();
}
- deliveryCount = rollbackState.deliveryCount();
+ // Do not revert the delivery count as the delivery count should not
be reverted for the failed
+ // state transition. However, in the current implementation, the
delivery count is only incremented
+ // when the state is updated to Acquired, hence reverting the delivery
count is not needed when
+ // the state transition fails.
rollbackState = null;
}
@@ -271,4 +278,12 @@ public class InFlightState {
", memberId=" + memberId +
")";
}
+
+ /**
+ * This record is used to store the state before the transition. It is used
to revert the state if the transition fails.
+ * @param state The state of the records before the transition.
+ * @param maxDeliveryCount The maximum delivery count for the record.
+ */
+ private record RollbackState(InFlightState state, int maxDeliveryCount) {
+ }
}