adixitconfluent commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2203842529
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7467,6 +7468,164 @@ public void
testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
assertEquals(20, sharePartition.nextFetchOffset());
}
+ @Test
+ public void testLsoMovementWithWriteStateRPCFailuresInAck() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+ // Return futures which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
+ // Mocking the persister write state RPC to return future 1 and future
2 when acknowledgement occurs for
+ // offsets 2-6 and 7-11 respectively.
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id))));
+
+ // Validate that there is no ongoing transition.
+
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+ // LSO is at 9.
+ sharePartition.updateCacheAndOffsets(9);
+
+ // Start offset will be moved.
+ assertEquals(9, sharePartition.nextFetchOffset());
+ assertEquals(9, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(7L).state());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(8L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(9L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(10L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(11L).state());
+
+ // Complete future1 exceptionally so acknowledgement for 2-6 offsets
will be completed.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+ future1.complete(writeShareGroupStateResult);
+
+ // The completion of future1 with exception should not impact the
cached state since those records have already
+ // been archived.
+ assertEquals(9, sharePartition.nextFetchOffset());
+ assertEquals(9, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(7L).state());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(8L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(9L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(10L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(11L).state());
Review Comment:
This is intended because we are testing that the records that have been
marked ARCHIVED due to LSO movement should remain ARCHIVED even though we have
failures in Write State RPC for those records. Perhaps, I'll just remove the
last 2 asserts to avoid any confusion.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7467,6 +7468,164 @@ public void
testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
assertEquals(20, sharePartition.nextFetchOffset());
}
+ @Test
+ public void testLsoMovementWithWriteStateRPCFailuresInAck() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+ // Return futures which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
+ // Mocking the persister write state RPC to return future 1 and future
2 when acknowledgement occurs for
+ // offsets 2-6 and 7-11 respectively.
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id))));
+
+ // Validate that there is no ongoing transition.
+
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+ // LSO is at 9.
+ sharePartition.updateCacheAndOffsets(9);
+
+ // Start offset will be moved.
+ assertEquals(9, sharePartition.nextFetchOffset());
+ assertEquals(9, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(7L).state());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(8L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(9L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(10L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(11L).state());
+
+ // Complete future1 exceptionally so acknowledgement for 2-6 offsets
will be completed.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+ future1.complete(writeShareGroupStateResult);
+
+ // The completion of future1 with exception should not impact the
cached state since those records have already
+ // been archived.
+ assertEquals(9, sharePartition.nextFetchOffset());
+ assertEquals(9, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(7L).state());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(7L).offsetState().get(8L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(9L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(10L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).offsetState().get(11L).state());
+ }
+
+ @Test
+ public void inFlightStateRollbackAndArchiveStateTransition() throws
InterruptedException {
+ InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED,
1, MEMBER_ID);
+
+ inFlightState.startStateTransition(RecordState.ACKNOWLEDGED,
SharePartition.DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
+ assertTrue(inFlightState.hasOngoingStateTransition());
+
+ // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED
which is not committed yet. At the same
+ // time when we have a call to completeStateTransition with false
commit value, we get a call to ARCHIVE the record.
+ // No matter the order of the 2 calls, we should always be getting the
final state as ARCHIVED.
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ try {
+ List<Callable<Void>> callables = List.of(
+ () -> {
+ inFlightState.archive("member-2");
+ return null;
+ },
+ () -> {
+ inFlightState.completeStateTransition(false);
+ return null;
+ }
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]