chia7712 commented on code in PR #20221:
URL: https://github.com/apache/kafka/pull/20221#discussion_r2228931829
##########
tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java:
##########
@@ -134,8 +136,13 @@ private static void consume(KafkaConsumer<byte[], byte[]>
consumer,
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
SimpleDateFormat dateFormat = options.dateFormat();
- consumer.subscribe(options.topic(),
- new ConsumerPerfRebListener(joinTimeMs, joinStartMs,
joinTimeMsInSingleRound));
+
+ ConsumerPerfRebListener listener = new
ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
Review Comment:
Perhaps `ConsumerPerfRebListener` needs to be updated. It expects the fetch
can only work if partitions are assigned, so it calculate the elapsed time of
the join in `onPartitionsAssigned`. However, in the `include` mode, other
topics could be assigned to this consumer even though the fetch is starting.
That could ultimately corrupt the `fetchTimeInMs`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]