divijvaidya commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514414426
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -202,6 +203,9 @@ public List<EpochEntry> read() {
private final UnifiedLog mockLog = mock(UnifiedLog.class);
+ Integer maxEntries = 30;
Review Comment:
`private final static` please
Also, for constants we usually capital snake syntax such as MAX_ENTRIES
(same for base offset)
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -722,6 +727,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment
segment, long nextSegment
File logFile = segment.log().file();
String logFileName = logFile.getName();
+ // Corrupted indexes should not be uploaded to remote storage
+ // Example case: Local storage was filled, what caused index
corruption
+ // We should avoid uploading such segments
+ segment.timeIndex().sanityCheck();
+ segment.offsetIndex().sanityCheck();
+ segment.txnIndex().sanityCheck();
+
logger.info("Copying {} to remote storage.", logFileName);
Review Comment:
This should probably be moved before the sanity checks so that during
debugging it is easy to understand what segment was being uploaded when index
failed.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
Review Comment:
May I suggest to create the 4th param as false i.e. writable = false since
we don't intend to write to this file. Similar for Offset index.
Also please close them at the end of test else they will continue to be
mmapped
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+ when(ti.entries()).thenReturn(1);
+ // One of the checks to detect index being corrupted is checking that:
+ // offset < baseOffset, BaseOffset is 45, offset => 0
+ when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+ // Initialise timeIndex for oldSegment
+ when(oldSegment.timeIndex()).thenReturn(ti);
+ when(oldSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile1));
+ when(oldSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ oldSegmentStartOffset, maxEntries * 8));
+
+ File txnFile2 = UnifiedLog.transactionIndexFile(tempDir,
nextSegmentStartOffset, "");
+ txnFile2.createNewFile();
+
+ when(activeSegment.timeIndex()).thenReturn(new
TimeIndex(TestUtils.tempFile(), 45L, 1));
+ when(activeSegment.log()).thenReturn(fileRecords);
+ when(activeSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile2));
+ when(activeSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ nextSegmentStartOffset, maxEntries * 8));
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
Review Comment:
do we need these? These function will never be called. No?
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+ when(ti.entries()).thenReturn(1);
+ // One of the checks to detect index being corrupted is checking that:
+ // offset < baseOffset, BaseOffset is 45, offset => 0
+ when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+ // Initialise timeIndex for oldSegment
+ when(oldSegment.timeIndex()).thenReturn(ti);
+ when(oldSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile1));
+ when(oldSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ oldSegmentStartOffset, maxEntries * 8));
+
+ File txnFile2 = UnifiedLog.transactionIndexFile(tempDir,
nextSegmentStartOffset, "");
+ txnFile2.createNewFile();
+
+ when(activeSegment.timeIndex()).thenReturn(new
TimeIndex(TestUtils.tempFile(), 45L, 1));
+ when(activeSegment.log()).thenReturn(fileRecords);
+ when(activeSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile2));
+ when(activeSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ nextSegmentStartOffset, maxEntries * 8));
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
Review Comment:
do we need this? Our index validation should throw an error before we start
working with producer state.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+ when(ti.entries()).thenReturn(1);
+ // One of the checks to detect index being corrupted is checking that:
+ // offset < baseOffset, BaseOffset is 45, offset => 0
+ when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+ // Initialise timeIndex for oldSegment
+ when(oldSegment.timeIndex()).thenReturn(ti);
+ when(oldSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile1));
+ when(oldSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ oldSegmentStartOffset, maxEntries * 8));
+
+ File txnFile2 = UnifiedLog.transactionIndexFile(tempDir,
nextSegmentStartOffset, "");
+ txnFile2.createNewFile();
+
+ when(activeSegment.timeIndex()).thenReturn(new
TimeIndex(TestUtils.tempFile(), 45L, 1));
+ when(activeSegment.log()).thenReturn(fileRecords);
+ when(activeSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile2));
+ when(activeSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ nextSegmentStartOffset, maxEntries * 8));
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ // Just simulate situation when we upload next segment from another
leader
+ task.convertToLeader(2);
+ // Here CorruptIndex will be thrown and caught by
copyLogSegmentsToRemote
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
Review Comment:
can you please verify that copyLogSegment() was called. Otherwise we will
miss cases where the upload was skipped for some other reason
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
Review Comment:
Another suggestion - please name the variable as corruptedTimeIndex
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
Review Comment:
Alternatively, can we simply use a Mock of timeIndex here? (instead of spy)
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, l
}
}
+ @Test
+ void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes()
throws Exception {
+ long segmentStartOffset = 0L;
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create log segment, with 0 as log start offset
+ LogSegment segment = mock(LogSegment.class);
+
+ // Segment does not nage timeIndex() what is not acceptable to
sanityChecks.
+ // In that case segment won't be copied.
+ when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(segment);
+ when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
+ // Since segment with index corruption should not be uploaded
+ verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+ }
+
+
+ @Test
+ void testCorruptedTimeIndexes() throws Exception {
+ // copyLogSegment is executed in case we have more than 1 segment,
what is why we create 2 of them
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ // Mock all required segment data to be managed by RLMTask
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ File txnFile1 = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile1.createNewFile();
+
+ File timeindexFile = TestUtils.tempFile();
+
+ TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+ when(ti.entries()).thenReturn(1);
+ // One of the checks to detect index being corrupted is checking that:
+ // offset < baseOffset, BaseOffset is 45, offset => 0
+ when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+ // Initialise timeIndex for oldSegment
+ when(oldSegment.timeIndex()).thenReturn(ti);
+ when(oldSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile1));
+ when(oldSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ oldSegmentStartOffset, maxEntries * 8));
+
+ File txnFile2 = UnifiedLog.transactionIndexFile(tempDir,
nextSegmentStartOffset, "");
+ txnFile2.createNewFile();
+
+ when(activeSegment.timeIndex()).thenReturn(new
TimeIndex(TestUtils.tempFile(), 45L, 1));
+ when(activeSegment.log()).thenReturn(fileRecords);
+ when(activeSegment.txnIndex()).thenReturn(new
TransactionIndex(nextSegmentStartOffset, txnFile2));
+ when(activeSegment.offsetIndex()).thenReturn(new
OffsetIndex(TestUtils.tempFile(),
+ nextSegmentStartOffset, maxEntries * 8));
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
Review Comment:
nit
in Java you can create completed futures directly using
`CompletableFuture.completedFuture`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]