[ 
https://issues.apache.org/jira/browse/KAFKA-20253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-20253:
------------------------------
    Component/s: consumer

> High CPU loop on consumer after failed re-authentication
> --------------------------------------------------------
>
>                 Key: KAFKA-20253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20253
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer, security
>    Affects Versions: 4.0.0, 4.2.0
>         Environment: Kafka Client Version: 4.2.0, it can be reproduced with 
> 4.0.0 as well. Did not try other versions.
>            Reporter: Marko Å trukelj
>            Priority: Major
>
> Description:
> A consumer can enter a tight spin loop, pinning a CPU core, if it encounters 
> a re-authentication failure, followed by a successful re-authentication when 
> using SASL_OAUTHBEARER authentication mechanism. This can happen due to 
> temporary unavailability of the authorization server. It can be a short-lived 
> unavailability that happens during re-login, when the client needs to obtain 
> a new access token, or shortly after, when the broker may need the 
> authorization server to validate the token.
> An idle consumer (assigned 0 partitions), when a consumer group has more 
> consumers than topic partitions, is much more likely to experience this issue.
> In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the 
> client application process. In the AsyncKafkaConsumer (KIP-848), the 
> application main thread is spared, but the background 
> kafka-consumer-network-thread enters a similar loop, with application process 
> utilizing ~40-50% CPU.
> Steps to Reproduce:
> A reproducer with instructions is available at: 
> [GitHub|https://github.com/mstruk/kafka-consumer-reproducer]
> Actual Behavior:
> The idle consumer (sometimes the working consumer, sometimes both) enters a 
> busy-wait loop after failed re-authentication followed by a successful 
> re-authentication (after the authorization server is restored). The spin loop 
> mostly continues indefinitely for the idle consumer. It seems to sometimes 
> recover for non-idle consumer.
> Expected Behavior:
> The consumer should apply some amount of CPU sleep at all times. The internal 
> state after re-authentication failure and recovery should never be such that 
> there is a busy-wait loop.
> Technical Analysis & Root Cause:
> Through remote debugging and thread dumps, the issue traces back to what 
> appears to be an unhandled state where the internal heartbeat timers expire 
> but are never reset due to the network/auth failure, forcing a 0ms timeout on 
> the selector.
> Here is an example stack trace:
> {code:java}
> "main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s 
> tid=0x0000ffffaea028d0 nid=3 runnable  [0x0000ffffad52e000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPoll.wait([email protected]/Native Method)
>       at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown 
> Source)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown 
> Source)
>       - locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
>       - locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)  
>   <--- Due to timeout==0 selectNow() is called - no sleep
>       at org.apache.kafka.common.network.Selector.select(Selector.java:878)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:470)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)  
>   
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
>     <--- timeout==0
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
>      
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)
> calculated pollTimeout is always 0
> timer.remainingMs() == 0
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>       at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)
>       at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
>    
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
>     <--- This is the poll() loop, running and looping for 1 second before 
> returning 
>       at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)  
>   
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
>     <--- At this point poll(Duration) is invoked with 1 second duration 
>       at 
> io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
> {code}
> The KafkaConsumer.poll() is invoked with 1s duration, which puts a time 
> constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are 
> handled on the calling thread inside a loop that keeps looping until the time 
> constraint of 1s is reached. When the 100% CPU condition persists this loop 
> never sleeps due to the calculated timeout for NetworkClient.poll() always 
> being 0.
> It looks like the following is happening (this is my interpretation):
> The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for 
> the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long). 
> Because the heartbeat cannot be sent (due to the auth failure), a successful 
> response is never received. The heartbeatTimer.deadlineMs falls entirely into 
> the past (in my debugging, it was expired by days as I left the 100% CPU 
> state persist for days). Because Timer's deadlineMs < currentTimeMs, it 
> constantly returns 0. The main thread passes 0 to NetworkClient.poll(), which 
> results in SelectorImpl.selectNow() call, which returns immediately without 
> sleep.
> The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer 
> resulting in the same inconsistent internal state, which manifests slightly 
> differently in terms of threads CPU consumption, but essentially the same 
> condition.
> Workaround:
> One workaround for this issue is to carefully handle the 
> AuthenticationException such that the current KafkaConsumer should be closed 
> (KafkaConsumer.close()) and a new instance created for the application to 
> continue. A fresh KafkaConsumer comes with a fresh consistent internal state, 
> so there is no issue any more.
> While the workaround sounds easy there are messaging frameworks that abstract 
> away Kafka Clients API in order to be pluggable and support various messaging 
> libraries in a common API way - Quarkus + SmallRye for example. It is 
> generally the policy of such frameworks to let the client application handle 
> exceptions. But they do not necessarily make it easy for the client 
> application to conditionally recreate the underlying library's client object. 
> If such recreation is absolutely necessary for the production level 
> deployment, users are condemned to avoid simple best practices code patterns 
> and use complex extra code that is specific to the underlying messaging 
> library, and also error prone. Therefore, the proper way to address the issue 
> would be within Kafka Clients library itself.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to