adixitconfluent commented on code in PR #20310:
URL: https://github.com/apache/kafka/pull/20310#discussion_r2259627890
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7878,6 +7878,107 @@ public void
testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
}
+ @Test
+ public void testRecordArchivedWithWriteStateRPCFailure() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+ // Futures which will be completed later, so the batch state has
ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Acknowledge batches.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
+
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+ assertEquals(1,
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(1,
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+ future1.complete(writeShareGroupStateResult);
+ // The completion of future1 with exception should not impact the
cached state since those records have already
Review Comment:
I think this comment is incorrect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]