omkreddy commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636921661
##########
core/src/test/java/kafka/server/SharePartitionTest.java:
##########
@@ -63,4 +111,172 @@ public void testRecordStateForId() {
// Invalid check.
assertThrows(IllegalArgumentException.class, () ->
RecordState.forId((byte) 5));
}
+
+ @Test
+ public void testAcquireSingleRecord() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().build();
+ MemoryRecords records = memoryRecords(1);
+
+ CompletableFuture<List<AcquiredRecords>> result =
sharePartition.acquire(
+ MEMBER_ID,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
+ assertFalse(result.isCompletedExceptionally());
+
+ List<AcquiredRecords> acquiredRecordsList = result.join();
+ assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(1, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+ assertEquals(0, sharePartition.cachedState().get(0L).lastOffset());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(0L).batchMemberId());
+ assertEquals(1,
sharePartition.cachedState().get(0L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(0L).offsetState());
+ }
+
+ @Test
+ public void testAcquireMultipleRecords() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().build();
+ MemoryRecords records = memoryRecords(5, 10);
+
+ CompletableFuture<List<AcquiredRecords>> result =
sharePartition.acquire(
+ MEMBER_ID,
+ new FetchPartitionData(Errors.NONE, 20, 3, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
+ assertFalse(result.isCompletedExceptionally());
+
+ List<AcquiredRecords> acquiredRecordsList = result.join();
+ assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+ assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(10L).batchMemberId());
+ assertEquals(1,
sharePartition.cachedState().get(10L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(10L).offsetState());
+ }
+
+ @Test
+ public void testAcquireMultipleRecordsWithOverlapAndNewBatch() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().build();
+ MemoryRecords records = memoryRecords(5, 0);
+
+ CompletableFuture<List<AcquiredRecords>> result =
sharePartition.acquire(
+ MEMBER_ID,
+ new FetchPartitionData(Errors.NONE, 20, 3, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
+ assertFalse(result.isCompletedExceptionally());
+
+ List<AcquiredRecords> acquiredRecordsList = result.join();
+ assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ // Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should
be ignored.
+ records = memoryRecords(10, 0);
+ result = sharePartition.acquire(
+ MEMBER_ID,
+ new FetchPartitionData(Errors.NONE, 20, 3, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
+ assertFalse(result.isCompletedExceptionally());
+ acquiredRecordsList = result.join();
+ assertArrayEquals(expectedAcquiredRecords(memoryRecords(5, 5),
1).toArray(), acquiredRecordsList.toArray());
Review Comment:
any tests which acquires multiple subset batches?
##########
core/src/main/java/kafka/server/SharePartition.java:
##########
@@ -238,8 +245,77 @@ public static RecordState forId(byte id) {
* @return The next fetch offset that should be fetched from the leader.
*/
public long nextFetchOffset() {
- // TODO: Implement the logic to compute the next fetch offset.
- return 0;
+ /*
+ The logic for determining the next offset to fetch data from a Share
Partition hinges on a
+ flag called findNextFetchOffset. If this flag is set to true, then the
next fetch offset
+ should be re-computed, otherwise the next fetch offset is Share
Partition End Offset + 1.
+ The flag is set to true in the following cases:
+ 1. When some previously acquired records are acknowledged with type
RELEASE.
+ 2. When the record lock duration expires for some acquired records.
+ 3. When some records are released on share session close.
+ The re-computation of next fetch offset is done by iterating over the
cachedState and finding
+ the first available record. If no available record is found, then the
next fetch offset is
+ set to Share Partition End Offset + 1 and findNextFetchOffset flag is
set to false.
+ */
+ lock.writeLock().lock();
+ try {
+ // When none of the records in the cachedState are in the
AVAILABLE state, findNextFetchOffset will be false
+ if (!findNextFetchOffset.get()) {
+ if (cachedState.isEmpty() || startOffset >
cachedState.lastEntry().getValue().lastOffset()) {
+ // 1. When cachedState is empty, endOffset is set to the
next offset of the last offset removed from
+ // batch, which is the next offset to be fetched.
+ // 2. When startOffset has moved beyond the in-flight
records, startOffset and endOffset point to the LSO,
+ // which is the next offset to be fetched.
+ return endOffset;
+ } else {
+ return endOffset + 1;
+ }
+ }
+
+ // If this piece of code is reached, it means that
findNextFetchOffset is true
Review Comment:
Do we have any tests when findNextFetchOffset is true
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]