[
https://issues.apache.org/jira/browse/KAFKA-20253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-20253:
------------------------------
Component/s: consumer
> High CPU loop on consumer after failed re-authentication
> --------------------------------------------------------
>
> Key: KAFKA-20253
> URL: https://issues.apache.org/jira/browse/KAFKA-20253
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer, security
> Affects Versions: 4.0.0, 4.2.0
> Environment: Kafka Client Version: 4.2.0, it can be reproduced with
> 4.0.0 as well. Did not try other versions.
> Reporter: Marko Å trukelj
> Priority: Major
>
> Description:
> A consumer can enter a tight spin loop, pinning a CPU core, if it encounters
> a re-authentication failure, followed by a successful re-authentication when
> using SASL_OAUTHBEARER authentication mechanism. This can happen due to
> temporary unavailability of the authorization server. It can be a short-lived
> unavailability that happens during re-login, when the client needs to obtain
> a new access token, or shortly after, when the broker may need the
> authorization server to validate the token.
> An idle consumer (assigned 0 partitions), when a consumer group has more
> consumers than topic partitions, is much more likely to experience this issue.
> In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the
> client application process. In the AsyncKafkaConsumer (KIP-848), the
> application main thread is spared, but the background
> kafka-consumer-network-thread enters a similar loop, with application process
> utilizing ~40-50% CPU.
> Steps to Reproduce:
> A reproducer with instructions is available at:
> [GitHub|https://github.com/mstruk/kafka-consumer-reproducer]
> Actual Behavior:
> The idle consumer (sometimes the working consumer, sometimes both) enters a
> busy-wait loop after failed re-authentication followed by a successful
> re-authentication (after the authorization server is restored). The spin loop
> mostly continues indefinitely for the idle consumer. It seems to sometimes
> recover for non-idle consumer.
> Expected Behavior:
> The consumer should apply some amount of CPU sleep at all times. The internal
> state after re-authentication failure and recovery should never be such that
> there is a busy-wait loop.
> Technical Analysis & Root Cause:
> Through remote debugging and thread dumps, the issue traces back to what
> appears to be an unhandled state where the internal heartbeat timers expire
> but are never reset due to the network/auth failure, forcing a 0ms timeout on
> the selector.
> Here is an example stack trace:
> {code:java}
> "main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s
> tid=0x0000ffffaea028d0 nid=3 runnable [0x0000ffffad52e000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPoll.wait([email protected]/Native Method)
> at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown
> Source)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown
> Source)
> - locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
> - locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)
> <--- Due to timeout==0 selectNow() is called - no sleep
> at org.apache.kafka.common.network.Selector.select(Selector.java:878)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:470)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
> <--- timeout==0
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)
> calculated pollTimeout is always 0
> timer.remainingMs() == 0
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
> <--- This is the poll() loop, running and looping for 1 second before
> returning
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
> <--- At this point poll(Duration) is invoked with 1 second duration
> at
> io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
> {code}
> The KafkaConsumer.poll() is invoked with 1s duration, which puts a time
> constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are
> handled on the calling thread inside a loop that keeps looping until the time
> constraint of 1s is reached. When the 100% CPU condition persists this loop
> never sleeps due to the calculated timeout for NetworkClient.poll() always
> being 0.
> It looks like the following is happening (this is my interpretation):
> The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for
> the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long).
> Because the heartbeat cannot be sent (due to the auth failure), a successful
> response is never received. The heartbeatTimer.deadlineMs falls entirely into
> the past (in my debugging, it was expired by days as I left the 100% CPU
> state persist for days). Because Timer's deadlineMs < currentTimeMs, it
> constantly returns 0. The main thread passes 0 to NetworkClient.poll(), which
> results in SelectorImpl.selectNow() call, which returns immediately without
> sleep.
> The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer
> resulting in the same inconsistent internal state, which manifests slightly
> differently in terms of threads CPU consumption, but essentially the same
> condition.
> Workaround:
> One workaround for this issue is to carefully handle the
> AuthenticationException such that the current KafkaConsumer should be closed
> (KafkaConsumer.close()) and a new instance created for the application to
> continue. A fresh KafkaConsumer comes with a fresh consistent internal state,
> so there is no issue any more.
> While the workaround sounds easy there are messaging frameworks that abstract
> away Kafka Clients API in order to be pluggable and support various messaging
> libraries in a common API way - Quarkus + SmallRye for example. It is
> generally the policy of such frameworks to let the client application handle
> exceptions. But they do not necessarily make it easy for the client
> application to conditionally recreate the underlying library's client object.
> If such recreation is absolutely necessary for the production level
> deployment, users are condemned to avoid simple best practices code patterns
> and use complex extra code that is specific to the underlying messaging
> library, and also error prone. Therefore, the proper way to address the issue
> would be within Kafka Clients library itself.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)