This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5d0ddd6f71 MINOR: Refactored gap window names in share partition
(#20411)
c5d0ddd6f71 is described below
commit c5d0ddd6f715d1b194446aa045e3fb22712aee50
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Aug 27 10:06:43 2025 +0100
MINOR: Refactored gap window names in share partition (#20411)
As per the suggestion by @adixitconfluent and @chirag-wadhwa5,
[here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004),
I have refactored the code with variable and method names.
Reviewers: Andrew Schofield <[email protected]>, Chirag Wadhwa
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 74 +++---
.../kafka/server/share/SharePartitionTest.java | 261 +++++++++++----------
2 files changed, 168 insertions(+), 167 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 08a9539dbed..8ed094b85f1 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -260,10 +260,10 @@ public class SharePartition {
private long endOffset;
/**
- * The initial read gap offset tracks if there are any gaps in the
in-flight batch during initial
- * read of the share partition state from the persister.
+ * The persister read result gap window tracks if there are any gaps in
the in-flight batch during
+ * initial read of the share partition state from the persister.
*/
- private InitialReadGapOffset initialReadGapOffset;
+ private GapWindow persisterReadResultGapWindow;
/**
* We maintain the latest fetch offset and its metadata to estimate the
minBytes requirement more efficiently.
@@ -475,9 +475,9 @@ public class SharePartition {
// in the cached state are not missed
updateFindNextFetchOffset(true);
endOffset =
cachedState.lastEntry().getValue().lastOffset();
- // initialReadGapOffset is not required, if there are no
gaps in the read state response
+ // gapWindow is not required, if there are no gaps in the
read state response
if (gapStartOffset != -1) {
- initialReadGapOffset = new
InitialReadGapOffset(endOffset, gapStartOffset);
+ persisterReadResultGapWindow = new
GapWindow(endOffset, gapStartOffset);
}
// In case the persister read state RPC result contains no
AVAILABLE records, we can update cached state
// and start/end offsets.
@@ -561,20 +561,20 @@ public class SharePartition {
}
long nextFetchOffset = -1;
- long gapStartOffset = isInitialReadGapOffsetWindowActive() ?
initialReadGapOffset.gapStartOffset() : -1;
+ long gapStartOffset = isPersisterReadGapWindowActive() ?
persisterReadResultGapWindow.gapStartOffset() : -1;
for (Map.Entry<Long, InFlightBatch> entry :
cachedState.entrySet()) {
// Check if there exists any gap in the in-flight batch which
needs to be fetched. If
- // initialReadGapOffset's endOffset is equal to the share
partition's endOffset, then
+ // gapWindow's endOffset is equal to the share partition's
endOffset, then
// only the initial gaps should be considered. Once share
partition's endOffset is past
// initial read end offset then all gaps are anyway fetched.
- if (isInitialReadGapOffsetWindowActive()) {
+ if (isPersisterReadGapWindowActive()) {
if (entry.getKey() > gapStartOffset) {
nextFetchOffset = gapStartOffset;
break;
}
// If the gapStartOffset is already past the last offset
of the in-flight batch,
// then do not consider this batch for finding the next
fetch offset. For example,
- // consider during initialization, the
initialReadGapOffset is set to 5 and the
+ // consider during initialization, the gapWindow is set to
5 and the
// first cached batch is 15-18. First read will happen at
offset 5 and say the data
// fetched is [5-6], now next fetch offset should be 7.
This works fine but say
// subsequent read returns batch 8-11, and the
gapStartOffset will be 12. Without
@@ -769,10 +769,10 @@ public class SharePartition {
}
InFlightBatch inFlightBatch = entry.getValue();
- // If the initialReadGapOffset window is active, we need to
treat the gaps in between the window as
+ // If the gapWindow window is active, we need to treat the
gaps in between the window as
// acquirable. Once the window is inactive (when we have
acquired all the gaps inside the window),
// the remaining gaps are natural (data does not exist at
those offsets) and we need not acquire them.
- if (isInitialReadGapOffsetWindowActive()) {
+ if (isPersisterReadGapWindowActive()) {
// If nextBatchStartOffset is less than the key of the
entry, this means the fetch happened for a gap in the cachedState.
// Thus, a new batch needs to be acquired for the gap.
if (maybeGapStartOffset < entry.getKey()) {
@@ -858,7 +858,7 @@ public class SharePartition {
acquiredCount += shareAcquiredRecords.count();
}
if (!result.isEmpty()) {
- maybeUpdateReadGapFetchOffset(result.get(result.size() -
1).lastOffset() + 1);
+
maybeUpdatePersisterGapWindowStartOffset(result.get(result.size() -
1).lastOffset() + 1);
return
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData,
isolationLevel, new ShareAcquiredRecords(result, acquiredCount));
}
return new ShareAcquiredRecords(result, acquiredCount);
@@ -1469,20 +1469,20 @@ public class SharePartition {
}
// Method to reduce the window that tracks gaps in the cachedState
- private void maybeUpdateReadGapFetchOffset(long offset) {
+ private void maybeUpdatePersisterGapWindowStartOffset(long offset) {
lock.writeLock().lock();
try {
- if (initialReadGapOffset != null) {
- // When last cached batch for initial read gap window is
acquired, then endOffset is
- // same as the initialReadGapOffset's endOffset, but the gap
offset to update is
- // endOffset + 1. Hence, do not update the gap start offset if
the request offset
+ if (persisterReadResultGapWindow != null) {
+ // When last cached batch for persister's read gap window is
acquired, then endOffset is
+ // same as the gapWindow's endOffset, but the gap offset to
update in the method call
+ // is endOffset + 1. Hence, do not update the gap start offset
if the request offset
// is ahead of the endOffset.
- if (initialReadGapOffset.endOffset() == endOffset && offset <=
initialReadGapOffset.endOffset()) {
- initialReadGapOffset.gapStartOffset(offset);
+ if (persisterReadResultGapWindow.endOffset() == endOffset &&
offset <= persisterReadResultGapWindow.endOffset()) {
+ persisterReadResultGapWindow.gapStartOffset(offset);
} else {
- // The initial read gap offset is not valid anymore as the
end offset has moved
- // beyond the initial read gap offset. Hence, reset the
initial read gap offset.
- initialReadGapOffset = null;
+ // The persister's read gap window is not valid anymore as
the end offset has moved
+ // beyond the read gap window's endOffset. Hence, set the
gap window to null.
+ persisterReadResultGapWindow = null;
}
}
} finally {
@@ -1570,14 +1570,14 @@ public class SharePartition {
// batches align on batch boundaries. Hence, reset to last
offset itself if the batch's
// last offset is greater than the last offset for
acquisition, else there could be
// a situation where the batch overlaps with the initial read
gap offset window batch.
- // For example, if the initial read gap offset window is 10-30
i.e. initialReadGapOffset's
+ // For example, if the initial read gap offset window is 10-30
i.e. gapWindow's
// startOffset is 10 and endOffset is 30, and the first
persister's read batch is 15-30.
// Say first fetched batch from log is 10-30 and
maxFetchRecords is 1, then the lastOffset
// in this method call would be 14. As the maxFetchRecords is
lesser than the batch,
// hence last batch offset for request offset is fetched. In
this example it will
// be 30, hence check if the initial read gap offset window is
active and the last acquired
// offset should be adjusted to 14 instead of 30.
- if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset
> lastOffset) {
+ if (isPersisterReadGapWindowActive() && lastAcquiredOffset >
lastOffset) {
lastAcquiredOffset = lastOffset;
}
}
@@ -1596,7 +1596,7 @@ public class SharePartition {
if (lastAcquiredOffset > endOffset) {
endOffset = lastAcquiredOffset;
}
- maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1);
+ maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
return new ShareAcquiredRecords(acquiredRecords, (int)
(lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
@@ -2203,15 +2203,15 @@ public class SharePartition {
// If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
- if (isInitialReadGapOffsetWindowActive()) {
+ if (isPersisterReadGapWindowActive()) {
// This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
// Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
- // There is a gap from 21 to 30. Let the
initialReadGapOffset.gapStartOffset be 21. In this case,
+ // There is a gap from 21 to 30. Let the gapWindow's
gapStartOffset be 21. In this case,
// lastOffsetAcknowledged will be 20, but we cannot simply
move the start offset to the first offset
// of next cachedState batch (next cachedState batch is 31
to 40). There is an acquirable gap in between (21 to 30)
- // and The startOffset should be at 21. Hence, we set
startOffset to the minimum of initialReadGapOffset.gapStartOffset
+ // and The startOffset should be at 21. Hence, we set
startOffset to the minimum of gapWindow.gapStartOffset
// and higher key of lastOffsetAcknowledged
- startOffset =
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+ startOffset =
Math.min(persisterReadResultGapWindow.gapStartOffset(), startOffset);
}
lastKeyToRemove = entry.getKey();
} else {
@@ -2276,8 +2276,8 @@ public class SharePartition {
return isRecordStateAcknowledged(startOffsetState);
}
- private boolean isInitialReadGapOffsetWindowActive() {
- return initialReadGapOffset != null &&
initialReadGapOffset.endOffset() == endOffset;
+ private boolean isPersisterReadGapWindowActive() {
+ return persisterReadResultGapWindow != null &&
persisterReadResultGapWindow.endOffset() == endOffset;
}
/**
@@ -2300,7 +2300,7 @@ public class SharePartition {
for (NavigableMap.Entry<Long, InFlightBatch> entry :
cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
- if (isInitialReadGapOffsetWindowActive() &&
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+ if (isPersisterReadGapWindowActive() &&
inFlightBatch.lastOffset() >= persisterReadResultGapWindow.gapStartOffset()) {
return lastOffsetAcknowledged;
}
@@ -2865,8 +2865,8 @@ public class SharePartition {
}
// Visible for testing
- InitialReadGapOffset initialReadGapOffset() {
- return initialReadGapOffset;
+ GapWindow persisterReadResultGapWindow() {
+ return persisterReadResultGapWindow;
}
// Visible for testing.
@@ -2875,17 +2875,17 @@ public class SharePartition {
}
/**
- * The InitialReadGapOffset class is used to record the gap start and end
offset of the probable gaps
+ * The GapWindow class is used to record the gap start and end offset of
the probable gaps
* of available records which are neither known to Persister nor to
SharePartition. Share Partition
* will use this information to determine the next fetch offset and should
try to fetch the records
* in the gap.
*/
// Visible for Testing
- static class InitialReadGapOffset {
+ static class GapWindow {
private final long endOffset;
private long gapStartOffset;
- InitialReadGapOffset(long endOffset, long gapStartOffset) {
+ GapWindow(long endOffset, long gapStartOffset) {
this.endOffset = endOffset;
this.gapStartOffset = gapStartOffset;
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 75f27ec5382..47e214a716f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -17,6 +17,7 @@
package kafka.server.share;
import kafka.server.ReplicaManager;
+import kafka.server.share.SharePartition.GapWindow;
import kafka.server.share.SharePartition.SharePartitionState;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
@@ -965,11 +966,11 @@ public class SharePartitionTest {
assertEquals(3,
sharePartition.cachedState().get(21L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(21L).offsetState());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(10, initialReadGapOffset.gapStartOffset());
- assertEquals(30, initialReadGapOffset.endOffset());
+ assertEquals(10, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(30, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -1010,11 +1011,11 @@ public class SharePartitionTest {
assertEquals(3,
sharePartition.cachedState().get(30L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(30L).offsetState());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(10, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(10, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -1051,11 +1052,11 @@ public class SharePartitionTest {
assertEquals(3,
sharePartition.cachedState().get(30L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(30L).offsetState());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(21, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -1082,10 +1083,10 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(31, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
- // Since there are no gaps present in the readState response,
initialReadGapOffset should be null
- assertNull(initialReadGapOffset);
+ // Since there are no gaps present in the readState response,
persisterReadResultGapWindow should be null
+ assertNull(persisterReadResultGapWindow);
}
@Test
@@ -1118,9 +1119,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(10L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(10L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record that covers the entire range from 10
to 30 of initial read gap.
// The records in the batch are from 10 to 49.
@@ -1146,8 +1147,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(15L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(15L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Send the same batch again to acquire the next set of records.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -1180,8 +1181,8 @@ public class SharePartitionTest {
assertEquals(1,
sharePartition.cachedState().get(23L).batchDeliveryCount());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
- // As all the gaps are now filled, the initialReadGapOffset should be
null.
- assertNull(sharePartition.initialReadGapOffset());
+ // As all the gaps are now filled, the persisterReadResultGapWindow
should be null.
+ assertNull(sharePartition.persisterReadResultGapWindow());
// Now initial read gap is filled, so the complete batch can be
acquired despite max fetch records being 1.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -1234,9 +1235,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(10L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(10L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record that covers the entire range from 10
to 30 of initial read gap.
// The records in the batch are from 10 to 49.
@@ -1278,8 +1279,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(26L).batchState());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(31L).batchState());
assertEquals(49L, sharePartition.endOffset());
- // As all the gaps are now filled, the initialReadGapOffset should be
null.
- assertNull(sharePartition.initialReadGapOffset());
+ // As all the gaps are now filled, the persisterReadResultGapWindow
should be null.
+ assertNull(sharePartition.persisterReadResultGapWindow());
}
@Test
@@ -1312,9 +1313,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(10L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(10L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record that ends in between the cached batch
and the fetch offset is
// post startOffset.
@@ -1357,8 +1358,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
assertEquals(30L, sharePartition.endOffset());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(28L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(28L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@@ -1391,9 +1392,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(10L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(10L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create a single batch record where first offset is prior
startOffset.
MemoryRecords records = memoryRecords(16, 6);
@@ -1425,8 +1426,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(20L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(20L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@@ -1459,9 +1460,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(5L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(5L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create multiple batch records that covers the entire range from 5
to 30 of initial read gap.
// The records in the batch are from 5 to 49.
@@ -1496,8 +1497,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(7L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(7L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Remove first batch from the records as the fetch offset has moved
forward to 7 offset.
List<RecordBatch> batch = TestUtils.toList(records.batches());
@@ -1524,8 +1525,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(12L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(12L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Remove the next 2 batches from the records as the fetch offset has
moved forward to 12 offset.
int size = batch.get(1).sizeInBytes() + batch.get(2).sizeInBytes();
@@ -1561,8 +1562,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(26L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(26L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
// Remove the next 2 batches from the records as the fetch offset has
moved forward to 26 offset.
// Do not remove the 5th batch as it's only partially acquired.
@@ -1590,8 +1591,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(31L).batchState());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(26L).batchState());
assertEquals(49L, sharePartition.endOffset());
- // As all the gaps are now filled, the initialReadGapOffset should be
null.
- assertNull(sharePartition.initialReadGapOffset());
+ // As all the gaps are now filled, the persisterReadResultGapWindow
should be null.
+ assertNull(sharePartition.persisterReadResultGapWindow());
}
@Test
@@ -1624,9 +1625,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(5L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(5L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create multiple batch records that ends in between the cached batch
and the fetch offset is
// post startOffset.
@@ -1676,8 +1677,8 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
assertEquals(30L, sharePartition.endOffset());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(28L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(28L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@@ -1710,9 +1711,9 @@ public class SharePartitionTest {
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(10L,
sharePartition.initialReadGapOffset().gapStartOffset());
- assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(10L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ assertEquals(30L,
sharePartition.persisterReadResultGapWindow().endOffset());
// Create multiple batch records where multiple batches base offsets
are prior startOffset.
ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -1750,8 +1751,8 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
assertEquals(30L, sharePartition.endOffset());
- assertNotNull(sharePartition.initialReadGapOffset());
- assertEquals(20L,
sharePartition.initialReadGapOffset().gapStartOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(20L,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
}
@Test
@@ -3034,12 +3035,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(16, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(16, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(16, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3073,12 +3074,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(41, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(21, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3126,12 +3127,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(26, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(26, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(26, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3170,12 +3171,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(26, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(26, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(26, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3227,12 +3228,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(86, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(86, initialReadGapOffset.gapStartOffset());
- assertEquals(90, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(86, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(90, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3271,12 +3272,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(31, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(31, initialReadGapOffset.gapStartOffset());
- assertEquals(70, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(31, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(70, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3322,12 +3323,12 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(76, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // After records are acquired, the initialReadGapOffset should be
updated
- assertEquals(76, initialReadGapOffset.gapStartOffset());
- assertEquals(90, initialReadGapOffset.endOffset());
+ // After records are acquired, the persisterReadResultGapWindow should
be updated
+ assertEquals(76, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(90, persisterReadResultGapWindow.endOffset());
}
@@ -3375,11 +3376,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(27, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(27, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(27, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3424,11 +3425,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(21, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(21, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3473,11 +3474,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(21, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(21, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -3525,8 +3526,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(51, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNull(persisterReadResultGapWindow);
}
@Test
@@ -3569,8 +3570,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(61, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNull(persisterReadResultGapWindow);
}
@Test
@@ -3615,8 +3616,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(61, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNull(persisterReadResultGapWindow);
}
@Test
@@ -3664,8 +3665,8 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(61, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNull(persisterReadResultGapWindow);
}
@Test
@@ -3705,11 +3706,11 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(41, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(31, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(31, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
// Fetching from the nextFetchOffset so that endOffset moves ahead
records = memoryRecords(15, 41);
@@ -3725,9 +3726,9 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.stateEpoch());
assertEquals(56, sharePartition.nextFetchOffset());
- // Since the endOffset is now moved ahead, the initialReadGapOffset
should be empty
- initialReadGapOffset = sharePartition.initialReadGapOffset();
- assertNull(initialReadGapOffset);
+ // Since the endOffset is now moved ahead, the
persisterReadResultGapWindow should be empty
+ persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNull(persisterReadResultGapWindow);
}
@Test
@@ -6782,11 +6783,11 @@ public class SharePartitionTest {
assertEquals(40, sharePartition.endOffset());
assertEquals(21, sharePartition.nextFetchOffset());
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- assertEquals(21, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
}
@Test
@@ -7356,16 +7357,16 @@ public class SharePartitionTest {
sharePartition.maybeInitialize();
- SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
- assertNotNull(initialReadGapOffset);
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
- // Since there is a gap in the beginning, the initialReadGapOffset
window is same as the cachedState
- assertEquals(11, initialReadGapOffset.gapStartOffset());
- assertEquals(40, initialReadGapOffset.endOffset());
+ // Since there is a gap in the beginning, the
persisterReadResultGapWindow window is same as the cachedState
+ assertEquals(11, persisterReadResultGapWindow.gapStartOffset());
+ assertEquals(40, persisterReadResultGapWindow.endOffset());
long lastOffsetAcknowledged =
sharePartition.findLastOffsetAcknowledged();
- // Since the initialReadGapOffset window begins at startOffset, we
cannot count any of the offsets as acknowledged.
+ // Since the persisterReadResultGapWindow window begins at
startOffset, we cannot count any of the offsets as acknowledged.
// Thus, lastOffsetAcknowledged should be -1
assertEquals(-1, lastOffsetAcknowledged);
}