[
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146724#comment-17146724
]
Neo Wu commented on KAFKA-10134:
--------------------------------
something like this fix my issues, but i am not sure whether this is right
thing to do to fit bigger picture
{code:java}
// poll for new data until the timeout expires
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = null;
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
// try to update assignment metadata BUT do not need to block on the
timer if we still have
// some assigned partitions, since even if we are 1) in the middle of a
rebalance
// or 2) have partitions with unknown starting positions we may still
want to return some data
// as long as there are some partitions fetchable; NOTE we always use a
timer with 0ms
// to never block on completing the rebalance procedure if there's any
if (subscriptions.fetchablePartitions(tp -> true).isEmpty() || records
== null || records.isEmpty()) {
updateAssignmentMetadataIfNeeded(timer);
} else {
final Timer updateMetadataTimer = time.timer(0L);
updateAssignmentMetadataIfNeeded(updateMetadataTimer);
timer.update(updateMetadataTimer.currentTimeMs());
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round
of fetches
// and avoid block waiting for their responses to enable pipelining
while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must
not allow
// wakeups or any other errors to be triggered prior to returning the
fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
{code}
> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> ------------------------------------------------------------------------
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.5.0
> Reporter: Sean Guo
> Assignee: Guozhang Wang
> Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen
> when there is some load(some are long running tasks >30S) there, the CPU will
> go sky-high. It reads ~700% in our metrics so there should be several threads
> are in a tight loop. We have several consumer threads consuming from
> different partitions during the rebalance. This is reproducible in both the
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The
> difference is that with old eager rebalance rebalance protocol used the high
> CPU usage will dropped after the rebalance done. But when using cooperative
> one, it seems the consumers threads are stuck on something and couldn't
> finish the rebalance so the high CPU usage won't drop until we stopped our
> load. Also a small load without long running task also won't cause continuous
> high CPU usage as the rebalance can finish in that case.
>
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0
> cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable
> [0x00007fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4
> runnable [0x00007fe119aab000] java.lang.Thread.State: RUNNABLE at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> at
>
> By debugging into the code we found it looks like the clients are in a loop
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it
> seems related something changed between 2.4.1 and 2.5.0.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)