[ 
https://issues.apache.org/jira/browse/KAFKA-20253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marko Å trukelj updated KAFKA-20253:
-----------------------------------
    Description: 
Description:

A consumer can enter a tight spin loop, pinning a CPU core, if it encounters a 
re-authentication failure, followed by a successful re-authentication when 
using SASL_OAUTHBEARER authentication mechanism. This can happen due to 
temporary unavailability of the authorization server. It can be a short-lived 
unavailability that happens during re-login, when the client needs to obtain a 
new access token, or shortly after, when the broker may need the authorization 
server to validate the token.

An idle consumer (assigned 0 partitions), when a consumer group has more 
consumers than topic partitions, is much more likely to experience this issue.

In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the 
client application process. In the AsyncKafkaConsumer (KIP-848), the 
application main thread is spared, but the background 
kafka-consumer-network-thread enters a similar loop, with application process 
utilizing ~40-50% CPU.

Steps to Reproduce:

A reproducer with instructions is available at: 
[GitHub|https://github.com/mstruk/kafka-consumer-reproducer]

Actual Behavior:

The idle consumer (sometimes the working consumer, sometimes both) enters a 
busy-wait loop after failed re-authentication followed by a successful 
re-authentication (after the authorization server is restored). The spin loop 
mostly continues indefinitely for the idle consumer. It seems to sometimes 
recover for non-idle consumer.

Expected Behavior:

The consumer should apply some amount of CPU sleep at all times. The internal 
state after re-authentication failure and recovery should never be such that 
there is a busy-wait loop.

Technical Analysis & Root Cause:

Through remote debugging and thread dumps, the issue traces back to what 
appears to be an unhandled state where the internal heartbeat timers expire but 
are never reset due to the network/auth failure, forcing a 0ms timeout on the 
selector.

Here is an example stack trace:
{code:java}
"main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s 
tid=0x0000ffffaea028d0 nid=3 runnable  [0x0000ffffad52e000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait([email protected]/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown 
Source)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown 
Source)
        - locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
        - locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)

        at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)  
  <--- Due to timeout==0 selectNow() is called - no sleep

        at org.apache.kafka.common.network.Selector.select(Selector.java:878)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:470)

        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)  
  
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
    <--- timeout==0

        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
     
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)

calculated pollTimeout is always 0
timer.remainingMs() == 0

        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
        at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)

        at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
   
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
    <--- This is the poll() loop, running and looping for 1 second before 
returning 

        at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)    
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
    <--- At this point poll(Duration) is invoked with 1 second duration 

        at 
io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
{code}
The KafkaConsumer.poll() is invoked with 1s duration, which puts a time 
constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are 
handled on the calling thread inside a loop that keeps looping until the time 
constraint of 1s is reached. When the 100% CPU condition persists this loop 
never sleeps due to the calculated timeout for NetworkClient.poll() always 
being 0.

It looks like the following is happening (this is my interpretation):
The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for 
the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long). Because 
the heartbeat cannot be sent (due to the auth failure), a successful response 
is never received. The heartbeatTimer.deadlineMs falls entirely into the past 
(in my debugging, it was expired by days as I left the 100% CPU state persist 
for days). Because Timer's deadlineMs < currentTimeMs, it constantly returns 0. 
The main thread passes 0 to NetworkClient.poll(), which results in 
SelectorImpl.selectNow() call, which returns immediately without sleep.

The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer resulting 
in the same inconsistent internal state, which manifests slightly differently 
in terms of threads CPU consumption, but essentially the same condition.

Workaround:

One workaround for this issue is to carefully handle the 
AuthenticationException such that the current KafkaConsumer should be closed 
(KafkaConsumer.close()) and a new instance created for the application to 
continue. A fresh KafkaConsumer comes with a fresh consistent internal state, 
so there is no issue any more.

While the workaround sounds easy there are messaging frameworks that abstract 
away Kafka Clients API in order to be pluggable and support various messaging 
libraries in a common API way - Quarkus + SmallRye for example. It is generally 
the policy of such frameworks to let the client application handle exceptions. 
But they do not necessarily make it easy for the client application to 
conditionally recreate the underlying library's client object. If such 
recreation is absolutely necessary for the production level deployment, users 
are condemned to avoid simple best practices code patterns and use complex 
extra code that is specific to the underlying messaging library, and also error 
prone. Therefore, the proper way to address the issue would be within Kafka 
Clients library itself.

  was:
Description:

A consumer can enter a tight spin loop, pinning a CPU core, if it encounters a 
re-authentication failure, followed by a successful re-authentication when 
using SASL_OAUTHBEARER authentication mechanism. This can happen due to 
temporary unavailability of the authorization server. It can be a short-lived 
unavailability that happens during re-login, when the client needs to obtain a 
new access token, or shortly after, when the broker may need the authorization 
server to validate the token.

An idle consumer (assigned 0 partitions), when a consumer group has more 
consumers than topic partitions, is much more likely to experience this issue.

In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the 
client application process. In the AsyncKafkaConsumer (KIP-848), the 
application main thread is spared, but the background 
kafka-consumer-network-thread enters a similar loop, with application process 
utilizing ~40-50% CPU.

Steps to Reproduce:

A reproducer with instructions is available at: 
[[GitHub|https://github.com/mstruk/kafka-consumer-reproducer]|https://github.com/mstruk/kafka-consumer-reproducer]

Actual Behavior:

The idle consumer (sometimes the working consumer, sometimes both) enters a 
busy-wait loop after failed re-authentication followed by a successful 
re-authentication (after the authorization server is restored). The spin loop 
mostly continues indefinitely for the idle consumer. It seems to sometimes 
recover for non-idle consumer.

Expected Behavior:

The consumer should apply some amount of CPU sleep at all times. The internal 
state after re-authentication failure and recovery should never be such that 
there is a busy-wait loop.

Technical Analysis & Root Cause:

Through remote debugging and thread dumps, the issue traces back to what 
appears to be an unhandled state where the internal heartbeat timers expire but 
are never reset due to the network/auth failure, forcing a 0ms timeout on the 
selector.

Here is an example stack trace:
{code:java}
"main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s 
tid=0x0000ffffaea028d0 nid=3 runnable  [0x0000ffffad52e000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait([email protected]/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown 
Source)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown 
Source)
        - locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
        - locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)

        at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)  
  <--- Due to timeout==0 selectNow() is called - no sleep

        at org.apache.kafka.common.network.Selector.select(Selector.java:878)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:470)

        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)  
  
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
    <--- timeout==0

        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
     
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)

calculated pollTimeout is always 0
timer.remainingMs() == 0

        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
        at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)

        at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
   
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
    <--- This is the poll() loop, running and looping for 1 second before 
returning 

        at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)    
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
    <--- At this point poll(Duration) is invoked with 1 second duration 

        at 
io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
{code}
The KafkaConsumer.poll() is invoked with 1s duration, which puts a time 
constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are 
handled on the calling thread inside a loop that keeps looping until the time 
constraint of 1s is reached. When the 100% CPU condition persists this loop 
never sleeps due to the calculated timeout for NetworkClient.poll() always 
being 0.

It looks like the following is happening (this is my interpretation):
The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for 
the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long). Because 
the heartbeat cannot be sent (due to the auth failure), a successful response 
is never received. The heartbeatTimer.deadlineMs falls entirely into the past 
(in my debugging, it was expired by days as I left the 100% CPU state persist 
for days). Because Timer's deadlineMs < currentTimeMs, it constantly returns 0. 
The main thread passes 0 to NetworkClient.poll(), which results in 
SelectorImpl.selectNow() call, which returns immediately without sleep.

The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer resulting 
in the same inconsistent internal state, which manifests slightly differently 
in terms of threads CPU consumption, but essentially the same condition.

Workaround:

One workaround for this issue is to carefully handle the 
AuthenticationException such that the current KafkaConsumer should be closed 
(KafkaConsumer.close()) and a new instance created for the application to 
continue. A fresh KafkaConsumer comes with a fresh consistent internal state, 
so there is no issue any more.

While the workaround sounds easy there are messaging frameworks that abstract 
away Kafka Clients API in order to be pluggable and support various messaging 
libraries in a common API way - Quarkus + SmallRye for example. It is generally 
the policy of such frameworks to let the client application handle exceptions. 
But they do not necessarily make it easy for the client application to 
conditionally recreate the underlying library's client object. If such 
recreation is absolutely necessary for the production level deployment, users 
are condemned to avoid simple best practices code patterns and use complex 
extra code that is specific to the underlying messaging library, and also error 
prone. Therefore, the proper way to address the issue would be within Kafka 
Clients library itself.


> High CPU loop on consumer after failed re-authentication
> --------------------------------------------------------
>
>                 Key: KAFKA-20253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20253
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, security
>    Affects Versions: 4.0.0, 4.2.0
>         Environment: Kafka Client Version: 4.2.0, it can be reproduced with 
> 4.0.0 as well. Did not try other versions.
>            Reporter: Marko Å trukelj
>            Priority: Major
>
> Description:
> A consumer can enter a tight spin loop, pinning a CPU core, if it encounters 
> a re-authentication failure, followed by a successful re-authentication when 
> using SASL_OAUTHBEARER authentication mechanism. This can happen due to 
> temporary unavailability of the authorization server. It can be a short-lived 
> unavailability that happens during re-login, when the client needs to obtain 
> a new access token, or shortly after, when the broker may need the 
> authorization server to validate the token.
> An idle consumer (assigned 0 partitions), when a consumer group has more 
> consumers than topic partitions, is much more likely to experience this issue.
> In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the 
> client application process. In the AsyncKafkaConsumer (KIP-848), the 
> application main thread is spared, but the background 
> kafka-consumer-network-thread enters a similar loop, with application process 
> utilizing ~40-50% CPU.
> Steps to Reproduce:
> A reproducer with instructions is available at: 
> [GitHub|https://github.com/mstruk/kafka-consumer-reproducer]
> Actual Behavior:
> The idle consumer (sometimes the working consumer, sometimes both) enters a 
> busy-wait loop after failed re-authentication followed by a successful 
> re-authentication (after the authorization server is restored). The spin loop 
> mostly continues indefinitely for the idle consumer. It seems to sometimes 
> recover for non-idle consumer.
> Expected Behavior:
> The consumer should apply some amount of CPU sleep at all times. The internal 
> state after re-authentication failure and recovery should never be such that 
> there is a busy-wait loop.
> Technical Analysis & Root Cause:
> Through remote debugging and thread dumps, the issue traces back to what 
> appears to be an unhandled state where the internal heartbeat timers expire 
> but are never reset due to the network/auth failure, forcing a 0ms timeout on 
> the selector.
> Here is an example stack trace:
> {code:java}
> "main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s 
> tid=0x0000ffffaea028d0 nid=3 runnable  [0x0000ffffad52e000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPoll.wait([email protected]/Native Method)
>       at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown 
> Source)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown 
> Source)
>       - locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
>       - locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)  
>   <--- Due to timeout==0 selectNow() is called - no sleep
>       at org.apache.kafka.common.network.Selector.select(Selector.java:878)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:470)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)  
>   
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
>     <--- timeout==0
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
>      
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)
> calculated pollTimeout is always 0
> timer.remainingMs() == 0
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>       at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)
>       at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
>    
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
>     <--- This is the poll() loop, running and looping for 1 second before 
> returning 
>       at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)  
>   
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
>     <--- At this point poll(Duration) is invoked with 1 second duration 
>       at 
> io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
> {code}
> The KafkaConsumer.poll() is invoked with 1s duration, which puts a time 
> constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are 
> handled on the calling thread inside a loop that keeps looping until the time 
> constraint of 1s is reached. When the 100% CPU condition persists this loop 
> never sleeps due to the calculated timeout for NetworkClient.poll() always 
> being 0.
> It looks like the following is happening (this is my interpretation):
> The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for 
> the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long). 
> Because the heartbeat cannot be sent (due to the auth failure), a successful 
> response is never received. The heartbeatTimer.deadlineMs falls entirely into 
> the past (in my debugging, it was expired by days as I left the 100% CPU 
> state persist for days). Because Timer's deadlineMs < currentTimeMs, it 
> constantly returns 0. The main thread passes 0 to NetworkClient.poll(), which 
> results in SelectorImpl.selectNow() call, which returns immediately without 
> sleep.
> The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer 
> resulting in the same inconsistent internal state, which manifests slightly 
> differently in terms of threads CPU consumption, but essentially the same 
> condition.
> Workaround:
> One workaround for this issue is to carefully handle the 
> AuthenticationException such that the current KafkaConsumer should be closed 
> (KafkaConsumer.close()) and a new instance created for the application to 
> continue. A fresh KafkaConsumer comes with a fresh consistent internal state, 
> so there is no issue any more.
> While the workaround sounds easy there are messaging frameworks that abstract 
> away Kafka Clients API in order to be pluggable and support various messaging 
> libraries in a common API way - Quarkus + SmallRye for example. It is 
> generally the policy of such frameworks to let the client application handle 
> exceptions. But they do not necessarily make it easy for the client 
> application to conditionally recreate the underlying library's client object. 
> If such recreation is absolutely necessary for the production level 
> deployment, users are condemned to avoid simple best practices code patterns 
> and use complex extra code that is specific to the underlying messaging 
> library, and also error prone. Therefore, the proper way to address the issue 
> would be within Kafka Clients library itself.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to