This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5d0ddd6f71 MINOR: Refactored gap window names in share partition 
(#20411)
c5d0ddd6f71 is described below

commit c5d0ddd6f715d1b194446aa045e3fb22712aee50
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Aug 27 10:06:43 2025 +0100

    MINOR: Refactored gap window names in share partition (#20411)
    
    As per the suggestion by @adixitconfluent and @chirag-wadhwa5,
    
    [here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004),
    I have refactored the code with variable and method names.
    
    Reviewers: Andrew Schofield <[email protected]>, Chirag Wadhwa
    <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  74 +++---
 .../kafka/server/share/SharePartitionTest.java     | 261 +++++++++++----------
 2 files changed, 168 insertions(+), 167 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 08a9539dbed..8ed094b85f1 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -260,10 +260,10 @@ public class SharePartition {
     private long endOffset;
 
     /**
-     * The initial read gap offset tracks if there are any gaps in the 
in-flight batch during initial
-     * read of the share partition state from the persister.
+     * The persister read result gap window tracks if there are any gaps in 
the in-flight batch during
+     * initial read of the share partition state from the persister.
      */
-    private InitialReadGapOffset initialReadGapOffset;
+    private GapWindow persisterReadResultGapWindow;
 
     /**
      * We maintain the latest fetch offset and its metadata to estimate the 
minBytes requirement more efficiently.
@@ -475,9 +475,9 @@ public class SharePartition {
                     // in the cached state are not missed
                     updateFindNextFetchOffset(true);
                     endOffset = 
cachedState.lastEntry().getValue().lastOffset();
-                    // initialReadGapOffset is not required, if there are no 
gaps in the read state response
+                    // gapWindow is not required, if there are no gaps in the 
read state response
                     if (gapStartOffset != -1) {
-                        initialReadGapOffset = new 
InitialReadGapOffset(endOffset, gapStartOffset);
+                        persisterReadResultGapWindow = new 
GapWindow(endOffset, gapStartOffset);
                     }
                     // In case the persister read state RPC result contains no 
AVAILABLE records, we can update cached state
                     // and start/end offsets.
@@ -561,20 +561,20 @@ public class SharePartition {
             }
 
             long nextFetchOffset = -1;
-            long gapStartOffset = isInitialReadGapOffsetWindowActive() ? 
initialReadGapOffset.gapStartOffset() : -1;
+            long gapStartOffset = isPersisterReadGapWindowActive() ? 
persisterReadResultGapWindow.gapStartOffset() : -1;
             for (Map.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
                 // Check if there exists any gap in the in-flight batch which 
needs to be fetched. If
-                // initialReadGapOffset's endOffset is equal to the share 
partition's endOffset, then
+                // gapWindow's endOffset is equal to the share partition's 
endOffset, then
                 // only the initial gaps should be considered. Once share 
partition's endOffset is past
                 // initial read end offset then all gaps are anyway fetched.
-                if (isInitialReadGapOffsetWindowActive()) {
+                if (isPersisterReadGapWindowActive()) {
                     if (entry.getKey() > gapStartOffset) {
                         nextFetchOffset = gapStartOffset;
                         break;
                     }
                     // If the gapStartOffset is already past the last offset 
of the in-flight batch,
                     // then do not consider this batch for finding the next 
fetch offset. For example,
-                    // consider during initialization, the 
initialReadGapOffset is set to 5 and the
+                    // consider during initialization, the gapWindow is set to 
5 and the
                     // first cached batch is 15-18. First read will happen at 
offset 5 and say the data
                     // fetched is [5-6], now next fetch offset should be 7. 
This works fine but say
                     // subsequent read returns batch 8-11, and the 
gapStartOffset will be 12. Without
@@ -769,10 +769,10 @@ public class SharePartition {
                 }
 
                 InFlightBatch inFlightBatch = entry.getValue();
-                // If the initialReadGapOffset window is active, we need to 
treat the gaps in between the window as
+                // If the gapWindow window is active, we need to treat the 
gaps in between the window as
                 // acquirable. Once the window is inactive (when we have 
acquired all the gaps inside the window),
                 // the remaining gaps are natural (data does not exist at 
those offsets) and we need not acquire them.
-                if (isInitialReadGapOffsetWindowActive()) {
+                if (isPersisterReadGapWindowActive()) {
                     // If nextBatchStartOffset is less than the key of the 
entry, this means the fetch happened for a gap in the cachedState.
                     // Thus, a new batch needs to be acquired for the gap.
                     if (maybeGapStartOffset < entry.getKey()) {
@@ -858,7 +858,7 @@ public class SharePartition {
                 acquiredCount += shareAcquiredRecords.count();
             }
             if (!result.isEmpty()) {
-                maybeUpdateReadGapFetchOffset(result.get(result.size() - 
1).lastOffset() + 1);
+                
maybeUpdatePersisterGapWindowStartOffset(result.get(result.size() - 
1).lastOffset() + 1);
                 return 
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, 
isolationLevel, new ShareAcquiredRecords(result, acquiredCount));
             }
             return new ShareAcquiredRecords(result, acquiredCount);
@@ -1469,20 +1469,20 @@ public class SharePartition {
     }
 
     // Method to reduce the window that tracks gaps in the cachedState
-    private void maybeUpdateReadGapFetchOffset(long offset) {
+    private void maybeUpdatePersisterGapWindowStartOffset(long offset) {
         lock.writeLock().lock();
         try {
-            if (initialReadGapOffset != null) {
-                // When last cached batch for initial read gap window is 
acquired, then endOffset is
-                // same as the initialReadGapOffset's endOffset, but the gap 
offset to update is
-                // endOffset + 1. Hence, do not update the gap start offset if 
the request offset
+            if (persisterReadResultGapWindow != null) {
+                // When last cached batch for persister's read gap window is 
acquired, then endOffset is
+                // same as the gapWindow's endOffset, but the gap offset to 
update in the method call
+                // is endOffset + 1. Hence, do not update the gap start offset 
if the request offset
                 // is ahead of the endOffset.
-                if (initialReadGapOffset.endOffset() == endOffset && offset <= 
initialReadGapOffset.endOffset()) {
-                    initialReadGapOffset.gapStartOffset(offset);
+                if (persisterReadResultGapWindow.endOffset() == endOffset && 
offset <= persisterReadResultGapWindow.endOffset()) {
+                    persisterReadResultGapWindow.gapStartOffset(offset);
                 } else {
-                    // The initial read gap offset is not valid anymore as the 
end offset has moved
-                    // beyond the initial read gap offset. Hence, reset the 
initial read gap offset.
-                    initialReadGapOffset = null;
+                    // The persister's read gap window is not valid anymore as 
the end offset has moved
+                    // beyond the read gap window's endOffset. Hence, set the 
gap window to null.
+                    persisterReadResultGapWindow = null;
                 }
             }
         } finally {
@@ -1570,14 +1570,14 @@ public class SharePartition {
                 // batches align on batch boundaries. Hence, reset to last 
offset itself if the batch's
                 // last offset is greater than the last offset for 
acquisition, else there could be
                 // a situation where the batch overlaps with the initial read 
gap offset window batch.
-                // For example, if the initial read gap offset window is 10-30 
i.e. initialReadGapOffset's
+                // For example, if the initial read gap offset window is 10-30 
i.e. gapWindow's
                 // startOffset is 10 and endOffset is 30, and the first 
persister's read batch is 15-30.
                 // Say first fetched batch from log is 10-30 and 
maxFetchRecords is 1, then the lastOffset
                 // in this method call would be 14. As the maxFetchRecords is 
lesser than the batch,
                 // hence last batch offset for request offset is fetched. In 
this example it will
                 // be 30, hence check if the initial read gap offset window is 
active and the last acquired
                 // offset should be adjusted to 14 instead of 30.
-                if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset 
> lastOffset) {
+                if (isPersisterReadGapWindowActive() && lastAcquiredOffset > 
lastOffset) {
                     lastAcquiredOffset = lastOffset;
                 }
             }
@@ -1596,7 +1596,7 @@ public class SharePartition {
             if (lastAcquiredOffset > endOffset) {
                 endOffset = lastAcquiredOffset;
             }
-            maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1);
+            maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
             return new ShareAcquiredRecords(acquiredRecords, (int) 
(lastAcquiredOffset - firstAcquiredOffset + 1));
         } finally {
             lock.writeLock().unlock();
@@ -2203,15 +2203,15 @@ public class SharePartition {
             // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
-                if (isInitialReadGapOffsetWindowActive()) {
+                if (isPersisterReadGapWindowActive()) {
                     // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
                     // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
-                    // There is a gap from 21 to 30. Let the 
initialReadGapOffset.gapStartOffset be 21. In this case,
+                    // There is a gap from 21 to 30. Let the gapWindow's 
gapStartOffset be 21. In this case,
                     // lastOffsetAcknowledged will be 20, but we cannot simply 
move the start offset to the first offset
                     // of next cachedState batch (next cachedState batch is 31 
to 40). There is an acquirable gap in between (21 to 30)
-                    // and The startOffset should be at 21. Hence, we set 
startOffset to the minimum of initialReadGapOffset.gapStartOffset
+                    // and The startOffset should be at 21. Hence, we set 
startOffset to the minimum of gapWindow.gapStartOffset
                     // and higher key of lastOffsetAcknowledged
-                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+                    startOffset = 
Math.min(persisterReadResultGapWindow.gapStartOffset(), startOffset);
                 }
                 lastKeyToRemove = entry.getKey();
             } else {
@@ -2276,8 +2276,8 @@ public class SharePartition {
         return isRecordStateAcknowledged(startOffsetState);
     }
 
-    private boolean isInitialReadGapOffsetWindowActive() {
-        return initialReadGapOffset != null && 
initialReadGapOffset.endOffset() == endOffset;
+    private boolean isPersisterReadGapWindowActive() {
+        return persisterReadResultGapWindow != null && 
persisterReadResultGapWindow.endOffset() == endOffset;
     }
 
     /**
@@ -2300,7 +2300,7 @@ public class SharePartition {
             for (NavigableMap.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
                 InFlightBatch inFlightBatch = entry.getValue();
 
-                if (isInitialReadGapOffsetWindowActive() && 
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+                if (isPersisterReadGapWindowActive() && 
inFlightBatch.lastOffset() >= persisterReadResultGapWindow.gapStartOffset()) {
                     return lastOffsetAcknowledged;
                 }
 
@@ -2865,8 +2865,8 @@ public class SharePartition {
     }
 
     // Visible for testing
-    InitialReadGapOffset initialReadGapOffset() {
-        return initialReadGapOffset;
+    GapWindow persisterReadResultGapWindow() {
+        return persisterReadResultGapWindow;
     }
 
     // Visible for testing.
@@ -2875,17 +2875,17 @@ public class SharePartition {
     }
 
     /**
-     * The InitialReadGapOffset class is used to record the gap start and end 
offset of the probable gaps
+     * The GapWindow class is used to record the gap start and end offset of 
the probable gaps
      * of available records which are neither known to Persister nor to 
SharePartition. Share Partition
      * will use this information to determine the next fetch offset and should 
try to fetch the records
      * in the gap.
      */
     // Visible for Testing
-    static class InitialReadGapOffset {
+    static class GapWindow {
         private final long endOffset;
         private long gapStartOffset;
 
-        InitialReadGapOffset(long endOffset, long gapStartOffset) {
+        GapWindow(long endOffset, long gapStartOffset) {
             this.endOffset = endOffset;
             this.gapStartOffset = gapStartOffset;
         }
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 75f27ec5382..47e214a716f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -17,6 +17,7 @@
 package kafka.server.share;
 
 import kafka.server.ReplicaManager;
+import kafka.server.share.SharePartition.GapWindow;
 import kafka.server.share.SharePartition.SharePartitionState;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
@@ -965,11 +966,11 @@ public class SharePartitionTest {
         assertEquals(3, 
sharePartition.cachedState().get(21L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(21L).offsetState());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(10, initialReadGapOffset.gapStartOffset());
-        assertEquals(30, initialReadGapOffset.endOffset());
+        assertEquals(10, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(30, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -1010,11 +1011,11 @@ public class SharePartitionTest {
         assertEquals(3, 
sharePartition.cachedState().get(30L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(30L).offsetState());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(10, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(10, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -1051,11 +1052,11 @@ public class SharePartitionTest {
         assertEquals(3, 
sharePartition.cachedState().get(30L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(30L).offsetState());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(21, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -1082,10 +1083,10 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(31, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
 
-        // Since there are no gaps present in the readState response, 
initialReadGapOffset should be null
-        assertNull(initialReadGapOffset);
+        // Since there are no gaps present in the readState response, 
persisterReadResultGapWindow should be null
+        assertNull(persisterReadResultGapWindow);
     }
 
     @Test
@@ -1118,9 +1119,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(10L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
         // The records in the batch are from 10 to 49.
@@ -1146,8 +1147,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(15L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(15L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
 
         // Send the same batch again to acquire the next set of records.
         acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -1180,8 +1181,8 @@ public class SharePartitionTest {
         assertEquals(1, 
sharePartition.cachedState().get(23L).batchDeliveryCount());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(30L, sharePartition.endOffset());
-        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
-        assertNull(sharePartition.initialReadGapOffset());
+        // As all the gaps are now filled, the persisterReadResultGapWindow 
should be null.
+        assertNull(sharePartition.persisterReadResultGapWindow());
 
         // Now initial read gap is filled, so the complete batch can be 
acquired despite max fetch records being 1.
         acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -1234,9 +1235,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(10L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
         // The records in the batch are from 10 to 49.
@@ -1278,8 +1279,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
         assertEquals(49L, sharePartition.endOffset());
-        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
-        assertNull(sharePartition.initialReadGapOffset());
+        // As all the gaps are now filled, the persisterReadResultGapWindow 
should be null.
+        assertNull(sharePartition.persisterReadResultGapWindow());
     }
 
     @Test
@@ -1312,9 +1313,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(10L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create a single batch record that ends in between the cached batch 
and the fetch offset is
         // post startOffset.
@@ -1357,8 +1358,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
         assertEquals(30L, sharePartition.endOffset());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(28L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(28L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
     }
 
     @Test
@@ -1391,9 +1392,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(10L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create a single batch record where first offset is prior 
startOffset.
         MemoryRecords records = memoryRecords(16, 6);
@@ -1425,8 +1426,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(30L, sharePartition.endOffset());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(20L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(20L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
     }
 
     @Test
@@ -1459,9 +1460,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(5L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(5L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create multiple batch records that covers the entire range from 5 
to 30 of initial read gap.
         // The records in the batch are from 5 to 49.
@@ -1496,8 +1497,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(7L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(7L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
 
         // Remove first batch from the records as the fetch offset has moved 
forward to 7 offset.
         List<RecordBatch> batch = TestUtils.toList(records.batches());
@@ -1524,8 +1525,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(30L, sharePartition.endOffset());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(12L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(12L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
 
         // Remove the next 2 batches from the records as the fetch offset has 
moved forward to 12 offset.
         int size = batch.get(1).sizeInBytes() + batch.get(2).sizeInBytes();
@@ -1561,8 +1562,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(30L, sharePartition.endOffset());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(26L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(26L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
 
         // Remove the next 2 batches from the records as the fetch offset has 
moved forward to 26 offset.
         // Do not remove the 5th batch as it's only partially acquired.
@@ -1590,8 +1591,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(49L, sharePartition.endOffset());
-        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
-        assertNull(sharePartition.initialReadGapOffset());
+        // As all the gaps are now filled, the persisterReadResultGapWindow 
should be null.
+        assertNull(sharePartition.persisterReadResultGapWindow());
     }
 
     @Test
@@ -1624,9 +1625,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(5L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(5L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create multiple batch records that ends in between the cached batch 
and the fetch offset is
         // post startOffset.
@@ -1676,8 +1677,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
         assertEquals(30L, sharePartition.endOffset());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(28L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(28L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
     }
 
     @Test
@@ -1710,9 +1711,9 @@ public class SharePartitionTest {
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
-        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(10L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+        assertEquals(30L, 
sharePartition.persisterReadResultGapWindow().endOffset());
 
         // Create multiple batch records where multiple batches base offsets 
are prior startOffset.
         ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -1750,8 +1751,8 @@ public class SharePartitionTest {
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
         assertEquals(30L, sharePartition.endOffset());
-        assertNotNull(sharePartition.initialReadGapOffset());
-        assertEquals(20L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(20L, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
     }
 
     @Test
@@ -3034,12 +3035,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(16, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(16, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(16, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3073,12 +3074,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(41, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(21, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3126,12 +3127,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(26, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(26, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(26, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3170,12 +3171,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(26, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(26, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(26, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3227,12 +3228,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(86, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(86, initialReadGapOffset.gapStartOffset());
-        assertEquals(90, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(86, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(90, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3271,12 +3272,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(31, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(31, initialReadGapOffset.gapStartOffset());
-        assertEquals(70, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(31, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(70, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3322,12 +3323,12 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(76, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // After records are acquired, the initialReadGapOffset should be 
updated
-        assertEquals(76, initialReadGapOffset.gapStartOffset());
-        assertEquals(90, initialReadGapOffset.endOffset());
+        // After records are acquired, the persisterReadResultGapWindow should 
be updated
+        assertEquals(76, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(90, persisterReadResultGapWindow.endOffset());
     }
 
 
@@ -3375,11 +3376,11 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(27, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(27, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(27, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3424,11 +3425,11 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(21, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(21, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3473,11 +3474,11 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(21, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(21, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -3525,8 +3526,8 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(51, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNull(persisterReadResultGapWindow);
     }
 
     @Test
@@ -3569,8 +3570,8 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(61, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNull(persisterReadResultGapWindow);
     }
 
     @Test
@@ -3615,8 +3616,8 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(61, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNull(persisterReadResultGapWindow);
     }
 
     @Test
@@ -3664,8 +3665,8 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(61, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNull(persisterReadResultGapWindow);
     }
 
     @Test
@@ -3705,11 +3706,11 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(41, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(31, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(31, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
 
         // Fetching from the  nextFetchOffset so that endOffset moves ahead
         records = memoryRecords(15, 41);
@@ -3725,9 +3726,9 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.stateEpoch());
         assertEquals(56, sharePartition.nextFetchOffset());
 
-        // Since the endOffset is now moved ahead, the initialReadGapOffset 
should be empty
-        initialReadGapOffset = sharePartition.initialReadGapOffset();
-        assertNull(initialReadGapOffset);
+        // Since the endOffset is now moved ahead, the 
persisterReadResultGapWindow should be empty
+        persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNull(persisterReadResultGapWindow);
     }
 
     @Test
@@ -6782,11 +6783,11 @@ public class SharePartitionTest {
         assertEquals(40, sharePartition.endOffset());
         assertEquals(21, sharePartition.nextFetchOffset());
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        assertEquals(21, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        assertEquals(21, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
     }
 
     @Test
@@ -7356,16 +7357,16 @@ public class SharePartitionTest {
 
         sharePartition.maybeInitialize();
 
-        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
-        assertNotNull(initialReadGapOffset);
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
 
-        // Since there is a gap in the beginning, the initialReadGapOffset 
window is same as the cachedState
-        assertEquals(11, initialReadGapOffset.gapStartOffset());
-        assertEquals(40, initialReadGapOffset.endOffset());
+        // Since there is a gap in the beginning, the 
persisterReadResultGapWindow window is same as the cachedState
+        assertEquals(11, persisterReadResultGapWindow.gapStartOffset());
+        assertEquals(40, persisterReadResultGapWindow.endOffset());
 
         long lastOffsetAcknowledged = 
sharePartition.findLastOffsetAcknowledged();
 
-        // Since the initialReadGapOffset window begins at startOffset, we 
cannot count any of the offsets as acknowledged.
+        // Since the persisterReadResultGapWindow window begins at 
startOffset, we cannot count any of the offsets as acknowledged.
         // Thus, lastOffsetAcknowledged should be -1
         assertEquals(-1, lastOffsetAcknowledged);
     }


Reply via email to