This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2401cac2f35 MINOR: add test cases for dumpLogSegments (#21538)
2401cac2f35 is described below
commit 2401cac2f35de14f0e6be883d8e28d46046d89a3
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Mar 19 06:47:57 2026 +0800
MINOR: add test cases for dumpLogSegments (#21538)
Add some test to dumpLogSegmentsTest to enhance test coverage.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/tools/DumpLogSegments.java | 37 ++---
.../apache/kafka/tools/DumpLogSegmentsTest.java | 168 +++++++++++++++++++++
2 files changed, 188 insertions(+), 17 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index 69e5cf091ba..e0e042ecae8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -118,7 +118,12 @@ public class DumpLogSegments {
System.out.println("Dumping " + file);
String filename = file.getName();
- String suffix = filename.substring(filename.lastIndexOf("."));
+ int dotIndex = filename.lastIndexOf(".");
+ if (dotIndex == -1) {
+ System.err.println("Ignoring unknown file " + file);
+ continue;
+ }
+ String suffix = filename.substring(dotIndex);
switch (suffix) {
case UnifiedLog.LOG_FILE_SUFFIX, Snapshots.SUFFIX ->
@@ -529,47 +534,47 @@ public class DumpLogSegments {
}
static class TimeIndexDumpErrors {
- final Map<String, List<Pair<Long, Long>>>
misMatchesForTimeIndexFilesMap = new HashMap<>();
- final Map<String, List<Pair<Long, Long>>> outOfOrderTimestamp = new
HashMap<>();
- final Map<String, List<Pair<Long, Long>>> shallowOffsetNotFound = new
HashMap<>();
+ final Map<String, List<Map.Entry<Long, Long>>>
misMatchesForTimeIndexFilesMap = new HashMap<>();
+ final Map<String, List<Map.Entry<Long, Long>>> outOfOrderTimestamp =
new HashMap<>();
+ final Map<String, List<Map.Entry<Long, Long>>> shallowOffsetNotFound =
new HashMap<>();
void recordMismatchTimeIndex(File file, long indexTimestamp, long
logTimestamp) {
- List<Pair<Long, Long>> misMatchesSeq =
misMatchesForTimeIndexFilesMap
+ List<Map.Entry<Long, Long>> misMatchesSeq =
misMatchesForTimeIndexFilesMap
.computeIfAbsent(file.getAbsolutePath(), k -> new
ArrayList<>());
- misMatchesSeq.add(new Pair<>(indexTimestamp, logTimestamp));
+ misMatchesSeq.add(Map.entry(indexTimestamp, logTimestamp));
}
void recordOutOfOrderIndexTimestamp(File file, long indexTimestamp,
long prevIndexTimestamp) {
- List<Pair<Long, Long>> outOfOrderSeq = outOfOrderTimestamp
+ List<Map.Entry<Long, Long>> outOfOrderSeq = outOfOrderTimestamp
.computeIfAbsent(file.getAbsolutePath(), k -> new
ArrayList<>());
- outOfOrderSeq.add(new Pair<>(indexTimestamp, prevIndexTimestamp));
+ outOfOrderSeq.add(Map.entry(indexTimestamp, prevIndexTimestamp));
}
void recordShallowOffsetNotFound(File file, long indexOffset, long
logOffset) {
- List<Pair<Long, Long>> shallowOffsetNotFoundSeq =
shallowOffsetNotFound
+ List<Map.Entry<Long, Long>> shallowOffsetNotFoundSeq =
shallowOffsetNotFound
.computeIfAbsent(file.getAbsolutePath(), k -> new
ArrayList<>());
- shallowOffsetNotFoundSeq.add(new Pair<>(indexOffset, logOffset));
+ shallowOffsetNotFoundSeq.add(Map.entry(indexOffset, logOffset));
}
void printErrors() {
misMatchesForTimeIndexFilesMap.forEach((fileName,
listOfMismatches) -> {
System.err.println("Found timestamp mismatch in :" + fileName);
listOfMismatches.forEach(m ->
- System.err.println(" Index timestamp: " + m.first + ",
log timestamp: " + m.second)
+ System.err.println(" Index timestamp: " + m.getKey() + ",
log timestamp: " + m.getValue())
);
});
outOfOrderTimestamp.forEach((fileName, outOfOrderTimestamps) -> {
System.err.println("Found out of order timestamp in :" +
fileName);
outOfOrderTimestamps.forEach(m ->
- System.err.println(" Index timestamp: " + m.first + ",
Previously indexed timestamp: " + m.second)
+ System.err.println(" Index timestamp: " + m.getKey() + ",
Previously indexed timestamp: " + m.getValue())
);
});
- shallowOffsetNotFound.values().forEach(listOfShallowOffsetNotFound
-> {
- System.err.println("The following indexed offsets are not
found in the log.");
+ shallowOffsetNotFound.forEach((fileName,
listOfShallowOffsetNotFound) -> {
+ System.err.println("The following indexed offsets are not
found in :" + fileName);
listOfShallowOffsetNotFound.forEach(pair ->
- System.err.println("Indexed offset: " + pair.first + ",
found log offset: " + pair.second)
+ System.err.println("Indexed offset: " + pair.getKey() + ",
found log offset: " + pair.getValue())
);
});
}
@@ -786,8 +791,6 @@ public class DumpLogSegments {
}
}
- record Pair<F, S>(F first, S second) { }
-
private static class DumpLogSegmentsOptions extends CommandDefaultOptions {
private final OptionSpec<Void> printOpt;
private final OptionSpec<Void> verifyOpt;
diff --git
a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
index beae202068d..5b403d52352 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
@@ -85,12 +85,16 @@ import
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
+import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogSegment;
+import org.apache.kafka.storage.internals.log.ProducerStateEntry;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
@@ -99,6 +103,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
@@ -117,10 +122,12 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.kafka.tools.ToolsTestUtils.captureStandardErr;
import static org.apache.kafka.tools.ToolsTestUtils.captureStandardOut;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -330,6 +337,25 @@ public class DumpLogSegmentsTest {
assertEquals(Collections.emptyMap(), errors.shallowOffsetNotFound);
}
+ @Test
+ public void testIndexSanityCheck() throws Exception {
+ log = createTestLog();
+ addSimpleRecords(log, new ArrayList<>());
+
+ String output = runDumpLogSegments(new String[]
{"--index-sanity-check", "--files", indexFilePath});
+ assertTrue(output.contains("passed sanity check"), output);
+ }
+
+ @Test
+ public void testTimeIndexVerifyOnly() throws Exception {
+ log = createTestLog();
+ addSimpleRecords(log, new ArrayList<>());
+
+ String errOutput = captureStandardErr(
+ () -> runDumpLogSegments(new String[] {"--verify-index-only",
"--files", indexFilePath}));
+ assertTrue(errOutput.isEmpty(), errOutput);
+ }
+
private int countSubstring(String str, String sub) {
int count = 0;
for (int i = 0; i <= str.length() - sub.length(); i++) {
@@ -1455,4 +1481,146 @@ public class DumpLogSegmentsTest {
"Output should have balanced braces (no unmatched closing brace).
" +
"Found " + openBraces + " '{' and " + closeBraces + " '}'");
}
+
+ @Test
+ public void testDumpTxnIndex() throws Exception {
+ File txnIndexFile = new File(logDir, segmentName + ".txnindex");
+ try (TransactionIndex index = new TransactionIndex(0L, txnIndexFile)) {
+ index.append(new AbortedTxn(1L, 0, 10, 11));
+ index.append(new AbortedTxn(2L, 15, 25, 26));
+ index.flush();
+ }
+
+ String output = runDumpLogSegments(new String[]{"--files",
txnIndexFile.getAbsolutePath()});
+ assertTrue(output.contains("version: 0 producerId: 1 firstOffset: 0
lastOffset: 10 lastStableOffset: 11"), output);
+ assertTrue(output.contains("version: 0 producerId: 2 firstOffset: 15
lastOffset: 25 lastStableOffset: 26"), output);
+ }
+
+ @Test
+ public void testDumpProducerIdSnapshot() throws Exception {
+ File snapshotFile = new File(logDir, segmentName + ".snapshot");
+ Map<Long, ProducerStateEntry> entries = new HashMap<>();
+ entries.put(1L, new ProducerStateEntry(1L, (short) 5, 10, 12345L,
OptionalLong.of(100L)));
+ entries.put(2L, new ProducerStateEntry(2L, (short) 3, 7, 67890L,
OptionalLong.empty()));
+ ProducerStateManager.writeSnapshot(snapshotFile, entries, true);
+
+ String output = runDumpLogSegments(new String[]{"--files",
snapshotFile.getAbsolutePath()});
+ assertTrue(output.contains("producerId: 1"), output);
+ assertTrue(output.contains("producerEpoch: 5"), output);
+ assertTrue(output.contains("coordinatorEpoch: 10"), output);
+ assertTrue(output.contains("lastTimestamp: 12345"), output);
+ assertTrue(output.contains("currentTxnFirstOffset:
OptionalLong[100]"), output);
+ assertTrue(output.contains("producerId: 2"), output);
+ assertTrue(output.contains("producerEpoch: 3"), output);
+ assertTrue(output.contains("coordinatorEpoch: 7"), output);
+ assertTrue(output.contains("lastTimestamp: 67890"), output);
+ assertTrue(output.contains("currentTxnFirstOffset:
OptionalLong.empty"), output);
+ }
+
+ @Test
+ public void testDumpProducerIdSnapshotWithBatchMetadata() throws Exception
{
+ log = createTestLog();
+
log.appendAsLeader(MemoryRecords.withIdempotentRecords(Compression.NONE, 42L,
(short) 1, 0,
+ new SimpleRecord("a".getBytes()),
+ new SimpleRecord("b".getBytes())
+ ), 0);
+ log.roll();
+
+ // Find the snapshot file in the log directory
+ File[] snapshotFiles = logDir.listFiles((dir, name) ->
name.endsWith(".snapshot"));
+ assertTrue(snapshotFiles != null && snapshotFiles.length > 0,
"Expected at least one snapshot file");
+
+ String output = runDumpLogSegments(new String[]{"--files",
snapshotFiles[0].getAbsolutePath()});
+ assertTrue(output.contains("producerId: 42"), output);
+ assertTrue(output.contains("producerEpoch: 1"), output);
+ assertTrue(output.contains("coordinatorEpoch: -1"), output);
+ assertTrue(output.contains("currentTxnFirstOffset:
OptionalLong.empty"), output);
+ assertTrue(output.contains("lastTimestamp: -1"), output);
+ assertTrue(output.contains("firstSequence: 0"), output);
+ assertTrue(output.contains("lastSequence: 1"), output);
+ assertTrue(output.contains("lastOffset: 1"), output);
+ assertTrue(output.contains("offsetDelta: 1"), output);
+ assertTrue(output.contains("timestamp: -1"), output);
+ }
+
+ @Test
+ public void testDumpProducerIdSnapshotCorrupt() throws Exception {
+ File snapshotFile = new File(logDir, segmentName + ".snapshot");
+ Files.write(snapshotFile.toPath(), new byte[]{0, 1, 2, 3, 4, 5});
+
+ String errOutput = captureStandardErr(() -> {
+ try {
+ DumpLogSegments.main(new String[]{"--files",
snapshotFile.getAbsolutePath()});
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertFalse(errOutput.isEmpty(), "Expected error output for corrupt
snapshot");
+ }
+
+ @Test
+ public void testTimeIndexDumpErrorsPrintErrors() {
+ DumpLogSegments.TimeIndexDumpErrors errors = new
DumpLogSegments.TimeIndexDumpErrors();
+ File fakeFile = new File("/tmp/fake.timeindex");
+
+ errors.recordMismatchTimeIndex(fakeFile, 100L, 200L);
+ errors.recordOutOfOrderIndexTimestamp(fakeFile, 50L, 100L);
+ errors.recordShallowOffsetNotFound(fakeFile, 10L, -1L);
+
+ String errOutput = captureStandardErr(errors::printErrors);
+ assertTrue(errOutput.contains("Found timestamp mismatch in"),
errOutput);
+ assertTrue(errOutput.contains("Index timestamp: 100, log timestamp:
200"), errOutput);
+ assertTrue(errOutput.contains("Found out of order timestamp in"),
errOutput);
+ assertTrue(errOutput.contains("Index timestamp: 50, Previously indexed
timestamp: 100"), errOutput);
+ assertTrue(errOutput.contains("The following indexed offsets are not
found in"), errOutput);
+ assertTrue(errOutput.contains("Indexed offset: 10, found log offset:
-1"), errOutput);
+ }
+
+ @Test
+ public void testPrintTrailingBytes() throws Exception {
+ log = createTestLog();
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 0,
+ new SimpleRecord("a".getBytes())), 0);
+ log.flush(false);
+ Utils.closeQuietly(log, "UnifiedLog");
+ log = null;
+
+ // Append trailing garbage bytes to the log file
+ try (FileOutputStream fos = new FileOutputStream(logFilePath, true)) {
+ fos.write(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+ }
+
+ String output = runDumpLogSegments(new String[]{"--files",
logFilePath});
+ assertTrue(output.contains("Found 10 invalid bytes at the end of"),
output);
+ }
+
+ @Test
+ public void testInvalidDecoderClass() {
+ RuntimeException thrown = assertThrows(RuntimeException.class,
+ () -> runDumpLogSegments(new String[]{
+ "--value-decoder-class",
"org.apache.kafka.tools.api.NonExistentDecoder",
+ "--files", logFilePath
+ }));
+ assertTrue(thrown.getMessage().contains("Failed to load decoder
class"), thrown.getMessage());
+ }
+
+ @Test
+ public void testDumpUnknownFileSuffix() throws Exception {
+ File unknownFile = new File(logDir, "testfile.xyz");
+ Files.write(unknownFile.toPath(), new byte[0]);
+
+ String errOutput = captureStandardErr(
+ () -> runDumpLogSegments(new String[]{"--files",
unknownFile.getAbsolutePath()}));
+ assertTrue(errOutput.contains("Ignoring unknown file"), errOutput);
+ }
+
+ @Test
+ public void testDumpFileWithNoDotInName() throws Exception {
+ File noDotFile = new File(logDir, "nodotfile");
+ Files.write(noDotFile.toPath(), new byte[0]);
+
+ String errOutput = captureStandardErr(
+ () -> runDumpLogSegments(new String[]{"--files",
noDotFile.getAbsolutePath()}));
+ assertTrue(errOutput.contains("Ignoring unknown file"), errOutput);
+ }
}