AndrewJSchofield commented on code in PR #20388:
URL: https://github.com/apache/kafka/pull/20388#discussion_r2381595668
##########
tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java:
##########
@@ -230,24 +231,32 @@ private static void printExtendedProgress(long bytesRead,
public static class ConsumerPerfRebListener implements
ConsumerRebalanceListener {
private final AtomicLong joinTimeMs;
private final AtomicLong joinTimeMsInSingleRound;
+ private final Collection<TopicPartition> assignedPartitions;
private long joinStartMs;
public ConsumerPerfRebListener(AtomicLong joinTimeMs, long
joinStartMs, AtomicLong joinTimeMsInSingleRound) {
this.joinTimeMs = joinTimeMs;
this.joinStartMs = joinStartMs;
this.joinTimeMsInSingleRound = joinTimeMsInSingleRound;
+ this.assignedPartitions = new HashSet<>();
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- joinStartMs = System.currentTimeMillis();
+ assignedPartitions.removeAll(partitions);
+ if (assignedPartitions.isEmpty()) {
+ joinStartMs = System.currentTimeMillis();
+ }
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
- long elapsedMs = System.currentTimeMillis() - joinStartMs;
- joinTimeMs.addAndGet(elapsedMs);
- joinTimeMsInSingleRound.addAndGet(elapsedMs);
+ if (assignedPartitions.isEmpty()) {
+ long elapsedMs = System.currentTimeMillis() - joinStartMs;
+ joinTimeMs.addAndGet(elapsedMs);
+ joinTimeMsInSingleRound.addAndGet(elapsedMs);
Review Comment:
I know this isn't strictly fixing the same problem, but isn't
`joinTimeMsInSingleRound` flawed. The problem is that the rebalance listener is
passed a reference to an `AtomicLong` in its constructor, but the loop in the
`consume()` method creates a new `AtomicLong` in each reporting interval. It
probably should set the current value to zero on each reporting interval
instead. wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]