This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a27d6e32b02 MINOR: Optimize
RemoteLogManager#buildFilteredLeaderEpochMap (#20205)
a27d6e32b02 is described below
commit a27d6e32b0265dbb364c3c4533b8261d62ee837a
Author: majialong <[email protected]>
AuthorDate: Thu Jul 24 23:16:27 2025 +0800
MINOR: Optimize RemoteLogManager#buildFilteredLeaderEpochMap (#20205)
Optimize `RemoteLogManager#buildFilteredLeaderEpochMap` .
Add a temporary unit test `testBuildFilteredLeaderEpochMapModify` in
`RemoteLogManagerTest` to verify the output consistency of the method
before and after optimization.
Randomly generate leaderEpochs and iterate 100000 times for
verification.
```
@Test
public void testBuildFilteredLeaderEpochMapModify() {
int testIterations = 100000;
for (int i = 0; i < testIterations; i++) { TreeMap<Integer,
Long> leaderEpochToStartOffset =
generateRandomLeaderEpochAndStartOffset();
// before optimize NavigableMap<Integer, Long>
optimizeBefore =
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
// after optimize NavigableMap<Integer, Long>
optimizeAfter =
RemoteLogManager.buildFilteredLeaderEpochMap2(leaderEpochToStartOffset);
assertEquals(optimizeBefore, optimizeAfter); } }
private static TreeMap<Integer, Long>
generateRandomLeaderEpochAndStartOffset() { TreeMap<Integer,
Long> map = new TreeMap<>(); Random random = new Random();
int numEntries = random.nextInt(100000); long lastStartOffset =
0;
for (int i = 0; i < numEntries; i++) { // generate
a leader epoch int leaderEpoch = random.nextInt(100000);
long startOffset;
// generate a random start offset , or use the last start
offset if (i > 0 && random.nextDouble() < 0.2) {
startOffset = lastStartOffset; } else { startOffset =
Math.abs(random.nextLong()) % 100000; } lastStartOffset =
startOffset;
map.put(leaderEpoch, startOffset);
}
return map;
}
```
Command:
``` ./gradlew storage:test --tests RemoteLogManagerTest```
Result: All unit tests passed.
<img width="1258" height="424" alt="image"
src="https://github.com/user-attachments/assets/7d9fc3b5-3bbc-440f-b1cf-3a2a5f97557a"
/> <img width="411" height="66" alt="image"
src="https://github.com/user-attachments/assets/22a0b443-88e8-43d2-a3f2-51266935ed34"
/>
Reviewers: Kamal Chandraprakash <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../log/remote/storage/RemoteLogManager.java | 45 +++++-----------------
.../log/remote/storage/RemoteLogManagerTest.java | 12 ++++++
2 files changed, 21 insertions(+), 36 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 5ca596ec7b7..58bc519d3d1 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -1617,51 +1617,24 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
/**
* Returns a map containing the epoch vs start-offset for the given leader
epoch map by filtering the epochs that
* does not contain any messages/records associated with them.
- * For ex:
- * <pre>
- * {@code
- * <epoch - start offset>
- * 0 - 0
- * 1 - 10
- * 2 - 20
- * 3 - 30
- * 4 - 40
- * 5 - 60 // epoch 5 does not have records or messages associated with it
- * 6 - 60
- * 7 - 70
- * }
- * </pre>
- * When the above leaderEpochMap is passed to this method, it returns the
following map:
- * <pre>
- * {@code
- * <epoch - start offset>
- * 0 - 0
- * 1 - 10
- * 2 - 20
- * 3 - 30
- * 4 - 40
- * 6 - 60
- * 7 - 70
- * }
- * </pre>
+ *
* @param leaderEpochs The leader epoch map to be refined.
+ * @return A map containing only the epochs and their start offsets that
have associated messages/records.
*/
// Visible for testing
static NavigableMap<Integer, Long>
buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> leaderEpochs) {
- List<Integer> epochsWithNoMessages = new ArrayList<>();
+ TreeMap<Integer, Long> filteredLeaderEpochs = new TreeMap<>();
Map.Entry<Integer, Long> previousEpochAndOffset = null;
+
for (Map.Entry<Integer, Long> currentEpochAndOffset :
leaderEpochs.entrySet()) {
- if (previousEpochAndOffset != null &&
previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) {
- epochsWithNoMessages.add(previousEpochAndOffset.getKey());
+ if (previousEpochAndOffset != null &&
!previousEpochAndOffset.getValue().equals(currentEpochAndOffset.getValue())) {
+ filteredLeaderEpochs.put(previousEpochAndOffset.getKey(),
previousEpochAndOffset.getValue());
}
previousEpochAndOffset = currentEpochAndOffset;
}
- if (epochsWithNoMessages.isEmpty()) {
- return leaderEpochs;
- }
- TreeMap<Integer, Long> filteredLeaderEpochs = new
TreeMap<>(leaderEpochs);
- for (Integer epochWithNoMessage : epochsWithNoMessages) {
- filteredLeaderEpochs.remove(epochWithNoMessage);
+
+ if (previousEpochAndOffset != null) {
+ filteredLeaderEpochs.put(previousEpochAndOffset.getKey(),
previousEpochAndOffset.getValue());
}
return filteredLeaderEpochs;
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 99d9d43c9a1..c0b89bbf044 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -1847,6 +1847,18 @@ public class RemoteLogManagerTest {
NavigableMap<Integer, Long> refinedLeaderEpochMap =
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
assertEquals(expectedLeaderEpochs, refinedLeaderEpochMap);
+
+
+ TreeMap<Integer, Long> leaderEpochToStartOffset2 = new TreeMap<>();
+ leaderEpochToStartOffset2.put(0, 0L);
+ leaderEpochToStartOffset2.put(1, 0L);
+ leaderEpochToStartOffset2.put(2, 0L);
+
+ TreeMap<Integer, Long> expectedLeaderEpochs2 = new TreeMap<>();
+ expectedLeaderEpochs2.put(2, 0L);
+
+ NavigableMap<Integer, Long> refinedLeaderEpochMap2 =
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset2);
+ assertEquals(expectedLeaderEpochs2, refinedLeaderEpochMap2);
}
@Test