xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1639071233
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
}
@Test
- public void testAddAssignmentsForPartitions() throws InterruptedException {
+ public void testAddAssignmentsForPartitions() {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
- thread.start();
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : idPartitions) {
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
assertTrue(handler.isPartitionLoaded.get(idPartition));
}
}
@Test
- public void testRemoveAssignmentsForPartitions() throws
InterruptedException {
+ public void testRemoveAssignmentsForPartitions() {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
- thread.start();
+ consumerTask.ingestRecords();
final TopicIdPartition tpId = allPartitions.get(0);
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + "
to be assigned");
addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
- "Couldn't read record");
+ consumerTask.ingestRecords();
+ assertTrue(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
Review Comment:
I see😉, there was a bit of misunderstanding before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]