apoorvmittal10 commented on code in PR #20080:
URL: https://github.com/apache/kafka/pull/20080#discussion_r2179798245
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates(
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state
since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
+ if (state.state != RecordState.ARCHIVED) {
Review Comment:
Should it be:
```suggestion
if (state.state == AVAILABLE) {
```
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() {
assertNull(sharePartition.fetchLock()); // Fetch lock has been
released.
}
+ @Test
+ public void testAcquireWhenBatchHasOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+ // Acquire a single batch with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+ // Return a future which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id))));
+
+ // Assert the start offset has not moved and batch has ongoing
transition.
+ assertEquals(21L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(21L).batchMemberId());
+
+ // Acquire the same batch with member-2. This function call will
return with 0 records since there is an ongoing
+ // transition for this batch.
+ fetchAcquiredRecords(
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 0
+ );
+
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(21L).batchMemberId());
+
+ // Complete the future so acknowledge API can be completed, which
updates the cache. Now the records can be acquired.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+ future.complete(writeShareGroupStateResult);
+
+ // Acquire the same batch with member-2. 10 records will be acquired.
+ fetchAcquiredRecords(
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals("member-2",
sharePartition.cachedState().get(21L).batchMemberId());
+ }
+
+ @Test
+ public void testNextFetchOffsetWhenBatchHasOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+
+ // Acquire a single batch 0-9 with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+ fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Acquire a single batch 10-19 with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS,
10,
+ fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Validate that there is no ongoing transition.
+ assertEquals(2, sharePartition.cachedState().size());
+
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+
+ // Return futures which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
+ // Mocking the persister write state RPC to return future 1 when
acknowledgement occurs for offsets 0-9.
+ // Mocking the persister write state RPC to return future 2 when
acknowledgement occurs for offsets 10-19.
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RELEASE.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(10, 19, List.of(AcknowledgeType.RELEASE.id))));
+
+ // Complete future2 so second acknowledge API can be completed, which
updates the cache.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+ future2.complete(writeShareGroupStateResult);
+
+ // Offsets 0-9 will have ongoing state transition since future1 is not
complete yet.
+ // Offsets 10-19 won't have ongoing state transition since future2 has
been completed.
+
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
+
+ // nextFetchOffset should return 10 and not 0 because batch 0-9 is
undergoing state transition.
+ assertEquals(10, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+
+ // Acquire a single batch 0-50 with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+ fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM
+ ), 50
+ );
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+
+ // Return futures which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
+ // Mocking the persister write state RPC to return future 1 when
acknowledgement occurs for offsets 5-9.
+ // Mocking the persister write state RPC to return future 2 when
acknowledgement occurs for offsets 20-24.
Review Comment:
Similar to above.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() {
assertNull(sharePartition.fetchLock()); // Fetch lock has been
released.
}
+ @Test
+ public void testAcquireWhenBatchHasOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+ // Acquire a single batch with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+ // Return a future which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id))));
+
+ // Assert the start offset has not moved and batch has ongoing
transition.
+ assertEquals(21L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(21L).batchMemberId());
+
+ // Acquire the same batch with member-2. This function call will
return with 0 records since there is an ongoing
+ // transition for this batch.
+ fetchAcquiredRecords(
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 0
+ );
+
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(21L).batchMemberId());
+
+ // Complete the future so acknowledge API can be completed, which
updates the cache. Now the records can be acquired.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+ future.complete(writeShareGroupStateResult);
+
+ // Acquire the same batch with member-2. 10 records will be acquired.
+ fetchAcquiredRecords(
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals("member-2",
sharePartition.cachedState().get(21L).batchMemberId());
+ }
+
+ @Test
+ public void testNextFetchOffsetWhenBatchHasOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+
+ // Acquire a single batch 0-9 with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+ fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Acquire a single batch 10-19 with member-1.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS,
10,
+ fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Validate that there is no ongoing transition.
+ assertEquals(2, sharePartition.cachedState().size());
+
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+
+ // Return futures which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
+ // Mocking the persister write state RPC to return future 1 when
acknowledgement occurs for offsets 0-9.
+ // Mocking the persister write state RPC to return future 2 when
acknowledgement occurs for offsets 10-19.
Review Comment:
```suggestion
// Mocking the persister write state RPC to return future 1 and
future 2 when acknowledgement occurs for offsets 0-9 and 10-19 respectively.
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates(
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state
since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
+ if (state.state != RecordState.ARCHIVED) {
+ findNextFetchOffset.set(true);
+ }
Review Comment:
This change here makes sense, but do we need to remove
`findNextFetchOffset.set(true);` at other places where we just started the
transaction i.e. in acknowledgement?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]