abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622418254
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testCopyQuota(boolean quotaExceeded) throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ File tempFile = TestUtils.tempFile();
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+
+ // Set up the segment that is eligible for copy
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ // set up the active segment
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ File tempDir = TestUtils.tempDirectory();
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+ when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+ doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(2);
+
+ if (quotaExceeded) {
+ // Verify that the copy operation times out, since no segments can
be copied due to quota being exceeded
+ try {
+ assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
task.copyLogSegmentsToRemote(mockLog));
+ fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+ } catch (AssertionFailedError e) {
+ // Fail the test if the operation completed within the timeout
+ if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT))
{
+ fail(e.getMessage());
+ }
+ }
Review Comment:
The suggested approach will not work.
The assertTimeoutPreemptively method throws an assertion failed error if the
executable does not complete within the timeout.
I want to verify the timeout occurs (check for assertion failed error) and
fail if it does not (this also throws assertionfailederror). In the catch
block, I am verifying whether the assertionfailederror is because of the
timeout or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]