[
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ravi Gupta updated KAFKA-18355:
-------------------------------
Description:
We are running Kafka streams based application in production and have noticed
couple of times {*}lag on source topic partition start increasing{*}.
Based on investigation, we found the below happening:
* Thread responsible for the partition task gets Authentication exception (
MSK IAM authentication gives the transient exception) while producing record in
the Sink
{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
Connection to node 1
(b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy) failed
authentication due to: An error: (java.security.PrivilegedActionException:
javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused
by com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status
Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when
evaluating SASL token received from the Kafka Broker. Kafka Client will go to
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread |
xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
* In some cases, the system recovers when the next record is polled and the
Sink Node ( RecordCollectorImpl) throws the exception from the last message
while processing
* However, in couple of cases the following logs appears, approximately 5
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to
understand why thread stopped polling, however it seems heartbeat thread got
the same exception as producer){_}.
{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has
expired. This means the time between subsequent calls to poll() was longer than
the configured max.poll.interval.ms, which typically implies that the poll loop
is spending too much time processing messages. You can address this either by
increasing max.poll.interval.ms or by reducing the maximum size of batches
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread |
xxxxx-xxxx-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
* In such cases, the partition gets assigned to a new thread ( Thread 5),
however the new thread keep throwing the following exception:
{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
Encountered lock exception. Reattempting locking the state in the next
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
task [8_0] Failed to lock the state directory for task 8_0\n\tat
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
* We are using exception handler, however, in these failure cases our
exception handler is not called for both producer and consumer exception.
However in some authentication exception during consume/produce we see the
handler being called.
It seems that old thread didn't clean up its state because the producer failure
is cleaned up when processing next event which never happened due to consumer
exception. Neither did consumer failure tried to release the lock.
was:
We are running Kafka streams based application in production and have noticed
couple of times \{*}lag on source topic partition start increasing{*}.
Based on investigation, we found the below happening:
* Thread responsible for the partition task gets Authentication exception (
MSK IAM authentication gives the transient exception) while producing record in
the Sink
{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
Connection to node 1
(b-1.xxxxxxx.yyyyyy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.xxxx:yyyy)
failed authentication due to: An error:
(java.security.PrivilegedActionException: javax.security.sasl.SaslException:
Failed to find AWS IAM Credentials [Caused by
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code:
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when
evaluating SASL token received from the Kafka Broker. Kafka Client will go to
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread |
xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
* In some cases, the system recovers when the next record is polled and the
Sink Node ( RecordCollectorImpl) throws the exception from the last message
while processing
* However, in couple of cases the following logs appears, approximately 5
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to
understand why thread stopped polling, however it seems heartbeat thread got
the same exception as producer){_}.
{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has
expired. This means the time between subsequent calls to poll() was longer than
the configured max.poll.interval.ms, which typically implies that the poll loop
is spending too much time processing messages. You can address this either by
increasing max.poll.interval.ms or by reducing the maximum size of batches
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread |
xxxxx-xxxx-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
* In such cases, the partition gets assigned to a new thread ( Thread 5),
however the new thread keep throwing the following exception:
{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
Encountered lock exception. Reattempting locking the state in the next
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
task [8_0] Failed to lock the state directory for task 8_0\n\tat
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
* We are using exception handler, however, in these failure cases our
exception handler is not called for both producer and consumer exception.
However in some authentication exception during consume/produce we see the
handler being called.
It seems that old thread didn't clean up its state because the producer failure
is cleaned up when processing next event which never happened due to consumer
exception. Neither did consumer failure tried to release the lock.
> Stream thread blocks indefinitely for acquiring state directory lock
> --------------------------------------------------------------------
>
> Key: KAFKA-18355
> URL: https://issues.apache.org/jira/browse/KAFKA-18355
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.8.1
> Reporter: Ravi Gupta
> Priority: Major
>
> We are running Kafka streams based application in production and have noticed
> couple of times {*}lag on source topic partition start increasing{*}.
> Based on investigation, we found the below happening:
> * Thread responsible for the partition task gets Authentication exception (
> MSK IAM authentication gives the transient exception) while producing record
> in the Sink
> {code:java}
> {
> "level":"ERROR",
> "logger_name":"org.apache.kafka.clients.NetworkClient",
> "message":"[Producer
> clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
>
> transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
> Connection to node 1
> (b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy)
> failed authentication due to: An error:
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException:
> Failed to find AWS IAM Credentials [Caused by
> com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status
> Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when
> evaluating SASL token received from the Kafka Broker. Kafka Client will go to
> AUTHENTICATION_FAILED state.",
> "thread_name":"kafka-producer-network-thread |
> xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
> "time":"2024-12-26T07:40:45.113067247Z"
> } {code}
> * In some cases, the system recovers when the next record is polled and the
> Sink Node ( RecordCollectorImpl) throws the exception from the last message
> while processing
> * However, in couple of cases the following logs appears, approximately 5
> minutes after the producer failure. ( {_}N{_}{_}o additional log statement to
> understand why thread stopped polling, however it seems heartbeat thread got
> the same exception as producer){_}.
> {code:java}
> {
> "level":"WARN",
> "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
> "message":"[Consumer
> clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
> groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has
> expired. This means the time between subsequent calls to poll() was longer
> than the configured max.poll.interval.ms, which typically implies that the
> poll loop is spending too much time processing messages. You can address this
> either by increasing max.poll.interval.ms or by reducing the maximum size of
> batches returned in poll() with max.poll.records.",
> "thread_name":"kafka-coordinator-heartbeat-thread |
> xxxxx-xxxx-lall-lio-step-executor_lio-se",
> "time":"2024-12-26T07:45:43.286428901Z"
> } {code}
> * In such cases, the partition gets assigned to a new thread ( Thread 5),
> however the new thread keep throwing the following exception:
> {code:java}
> {
> "level":"INFO",
> "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
> "message":"stream-thread
> [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
> Encountered lock exception. Reattempting locking the state in the next
> iteration.",
> "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread
> [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
> task [8_0] Failed to lock the state directory for task 8_0\n\tat
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
>
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
>
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
>
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
> "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
> "time":"2024-12-26T07:50:53.904374419Z"
> } {code}
> * We are using exception handler, however, in these failure cases our
> exception handler is not called for both producer and consumer exception.
> However in some authentication exception during consume/produce we see the
> handler being called.
> It seems that old thread didn't clean up its state because the producer
> failure is cleaned up when processing next event which never happened due to
> consumer exception. Neither did consumer failure tried to release the lock.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)