xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1643456559
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -309,49 +300,52 @@ public void testCanReprocessSkippedRecords() throws
InterruptedException {
// explicitly re-adding the records since MockConsumer drops them on
poll.
addRecord(consumer, metadataPartition, tpId1, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
+ consumerTask.ingestRecords();
// Waiting for all metadata records to be re-read from the first
metadata partition number
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ assertEquals(Optional.of(1L),
consumerTask.readOffsetForMetadataPartition(metadataPartition));
// Verifying that all the metadata records from the first metadata
partition were processed properly.
- TestUtils.waitForCondition(() -> handler.metadataCounter == 2,
"Couldn't read record");
+ assertEquals(2, handler.metadataCounter);
}
@Test
- public void testMaybeMarkUserPartitionsAsReady() throws
InterruptedException {
+ public void testMaybeMarkUserPartitionsAsReady() {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
2L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
- thread.start();
+ consumerTask.ingestRecords();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+ assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " +
tpId + " has not been assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
assertFalse(handler.isPartitionInitialized.containsKey(tpId));
IntStream.range(0, 5).forEach(offset -> addRecord(consumer,
metadataPartition, tpId, offset));
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
"Couldn't read record");
+ consumerTask.ingestRecords();
+ assertEquals(Optional.of(4L),
consumerTask.readOffsetForMetadataPartition(metadataPartition));
assertTrue(handler.isPartitionInitialized.get(tpId));
}
@ParameterizedTest
@CsvSource(value = {"0, 0", "500, 500"})
- public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset,
- long
endOffset) throws InterruptedException {
+ public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset, long endOffset) {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
beginOffset));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
endOffset));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
- thread.start();
+ consumerTask.ingestRecords();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+ assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " +
tpId + " has not been assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
- TestUtils.waitForCondition(() ->
handler.isPartitionInitialized.containsKey(tpId),
- "should have initialized the partition");
+ assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should
have initialized the partition");
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
}
@Test
public void testConcurrentAccess() throws InterruptedException {
- thread.start();
+ // Here we need to test concurrent access. When ConsumerTask is
ingesting records,
+ // we need to concurrently add partitions and perform close()
+ new Thread(consumerTask).start();
Review Comment:
I don't necessarily think it needs join(), because this thread will be shut
down with close(). Even if, for some reason, it doesn't get closed, the
resources will be freed when the test ends anyway.
Just to be extra careful, I'll give it a try.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]