This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a27d6e32b02 MINOR: Optimize 
RemoteLogManager#buildFilteredLeaderEpochMap (#20205)
a27d6e32b02 is described below

commit a27d6e32b0265dbb364c3c4533b8261d62ee837a
Author: majialong <[email protected]>
AuthorDate: Thu Jul 24 23:16:27 2025 +0800

    MINOR: Optimize RemoteLogManager#buildFilteredLeaderEpochMap (#20205)
    
    Optimize `RemoteLogManager#buildFilteredLeaderEpochMap` .
    
    Add a temporary unit test `testBuildFilteredLeaderEpochMapModify` in
    `RemoteLogManagerTest` to verify the output consistency of the method
    before and after optimization.
    
    Randomly generate leaderEpochs and iterate 100000 times for
    verification.
    
    ```
        @Test
        public void testBuildFilteredLeaderEpochMapModify() {
            int testIterations = 100000;
    
            for (int i = 0; i < testIterations; i++) { TreeMap<Integer,
    Long> leaderEpochToStartOffset =
    generateRandomLeaderEpochAndStartOffset();
    
                // before optimize              NavigableMap<Integer, Long>
    optimizeBefore =
    RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
    
                // after optimize              NavigableMap<Integer, Long>
    optimizeAfter =
    RemoteLogManager.buildFilteredLeaderEpochMap2(leaderEpochToStartOffset);
    
                assertEquals(optimizeBefore, optimizeAfter);          } }
    
        private static TreeMap<Integer, Long>
    generateRandomLeaderEpochAndStartOffset() {          TreeMap<Integer,
    Long> map = new TreeMap<>();          Random random = new Random();
    int numEntries = random.nextInt(100000);          long lastStartOffset =
    0;
    
            for (int i = 0; i < numEntries; i++) {              // generate
    a leader epoch              int leaderEpoch = random.nextInt(100000);
    long startOffset;
    
                // generate a random start offset , or use the last start
    offset              if (i > 0 && random.nextDouble() < 0.2) {
    startOffset = lastStartOffset;              } else { startOffset =
    Math.abs(random.nextLong()) % 100000;              } lastStartOffset =
    startOffset;
    
                map.put(leaderEpoch, startOffset);
            }
            return map;
        }
    ```
    
    Command:
    ``` ./gradlew storage:test --tests RemoteLogManagerTest```
    
    Result:  All unit tests passed.
    
    <img width="1258" height="424" alt="image"
    
src="https://github.com/user-attachments/assets/7d9fc3b5-3bbc-440f-b1cf-3a2a5f97557a";
    />  <img width="411" height="66" alt="image"
    
src="https://github.com/user-attachments/assets/22a0b443-88e8-43d2-a3f2-51266935ed34";
    />
    
    Reviewers: Kamal Chandraprakash <[email protected]>,
     Chia-Ping Tsai <[email protected]>
---
 .../log/remote/storage/RemoteLogManager.java       | 45 +++++-----------------
 .../log/remote/storage/RemoteLogManagerTest.java   | 12 ++++++
 2 files changed, 21 insertions(+), 36 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 5ca596ec7b7..58bc519d3d1 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -1617,51 +1617,24 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
     /**
      * Returns a map containing the epoch vs start-offset for the given leader 
epoch map by filtering the epochs that
      * does not contain any messages/records associated with them.
-     * For ex:
-     * <pre>
-     * {@code
-     *  <epoch - start offset>
-     *  0 - 0
-     *  1 - 10
-     *  2 - 20
-     *  3 - 30
-     *  4 - 40
-     *  5 - 60  // epoch 5 does not have records or messages associated with it
-     *  6 - 60
-     *  7 - 70
-     * }
-     * </pre>
-     * When the above leaderEpochMap is passed to this method, it returns the 
following map:
-     * <pre>
-     * {@code
-     *  <epoch - start offset>
-     *  0 - 0
-     *  1 - 10
-     *  2 - 20
-     *  3 - 30
-     *  4 - 40
-     *  6 - 60
-     *  7 - 70
-     * }
-     * </pre>
+     *
      * @param leaderEpochs The leader epoch map to be refined.
+     * @return A map containing only the epochs and their start offsets that 
have associated messages/records.
      */
     // Visible for testing
     static NavigableMap<Integer, Long> 
buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> leaderEpochs) {
-        List<Integer> epochsWithNoMessages = new ArrayList<>();
+        TreeMap<Integer, Long> filteredLeaderEpochs = new TreeMap<>();
         Map.Entry<Integer, Long> previousEpochAndOffset = null;
+
         for (Map.Entry<Integer, Long> currentEpochAndOffset : 
leaderEpochs.entrySet()) {
-            if (previousEpochAndOffset != null && 
previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) {
-                epochsWithNoMessages.add(previousEpochAndOffset.getKey());
+            if (previousEpochAndOffset != null && 
!previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) {
+                filteredLeaderEpochs.put(previousEpochAndOffset.getKey(), 
previousEpochAndOffset.getValue());
             }
             previousEpochAndOffset = currentEpochAndOffset;
         }
-        if (epochsWithNoMessages.isEmpty()) {
-            return leaderEpochs;
-        }
-        TreeMap<Integer, Long> filteredLeaderEpochs = new 
TreeMap<>(leaderEpochs);
-        for (Integer epochWithNoMessage : epochsWithNoMessages) {
-            filteredLeaderEpochs.remove(epochWithNoMessage);
+
+        if (previousEpochAndOffset != null) {
+            filteredLeaderEpochs.put(previousEpochAndOffset.getKey(), 
previousEpochAndOffset.getValue());
         }
         return filteredLeaderEpochs;
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 99d9d43c9a1..c0b89bbf044 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -1847,6 +1847,18 @@ public class RemoteLogManagerTest {
 
         NavigableMap<Integer, Long> refinedLeaderEpochMap = 
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
         assertEquals(expectedLeaderEpochs, refinedLeaderEpochMap);
+
+
+        TreeMap<Integer, Long> leaderEpochToStartOffset2 = new TreeMap<>();
+        leaderEpochToStartOffset2.put(0, 0L);
+        leaderEpochToStartOffset2.put(1, 0L);
+        leaderEpochToStartOffset2.put(2, 0L);
+
+        TreeMap<Integer, Long> expectedLeaderEpochs2 = new TreeMap<>();
+        expectedLeaderEpochs2.put(2, 0L);
+
+        NavigableMap<Integer, Long> refinedLeaderEpochMap2 = 
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset2);
+        assertEquals(expectedLeaderEpochs2, refinedLeaderEpochMap2);
     }
 
     @Test

Reply via email to