gharris1727 commented on code in PR #15165:
URL: https://github.com/apache/kafka/pull/15165#discussion_r1619287340
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -254,6 +259,61 @@ public void testCanProcessRecord() throws
InterruptedException {
assertEquals(3, handler.metadataCounter);
}
+ @Test
+ public void testCanReprocessSkippedRecords() throws InterruptedException {
+ final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+ final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new
TopicPartition("sample", 0));
+ final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new
TopicPartition("sample", 1));
+ final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new
TopicPartition("sample", 3));
+ assertEquals(partitioner.metadataPartition(tpId0),
partitioner.metadataPartition(tpId1));
+ assertNotEquals(partitioner.metadataPartition(tpId3),
partitioner.metadataPartition(tpId0));
+
+ final int metadataPartition = partitioner.metadataPartition(tpId0);
+ final int anotherMetadataPartition =
partitioner.metadataPartition(tpId3);
+
+ // Mocking the consumer to be able to wait for the second reassignment
+ doAnswer(invocation -> {
Review Comment:
I don't like this mock-of-a-mock, but it appears necessary because the
ConsumerTaskTest is a multithreaded test.
The existing mocking mechanism for MockConsumer is schedulePollTask, which
is ineffective when the background thread is calling poll() _hundreds of
thousands of times_. This isn't for you to solve, so I won't block this PR on
that change. I've opened https://issues.apache.org/jira/browse/KAFKA-16862 to
track the improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]