[
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton resolved KAFKA-14401.
-----------------------------------
Fix Version/s: 3.9.0
Resolution: Fixed
> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---------------------------------------------------------------------------
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Reporter: Sagar Rao
> Assignee: Sagar Rao
> Priority: Major
> Fix For: 3.9.0
>
>
> When a connector or task tries to read the offsets from the offsets topic, it
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the
> underneath KafkaBackingStore. KafkaBackingStore invokes
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the
> callback queue and executes them and it does this in an infinite loop.
> However, there is an enclosing try/catch block around the while loop. If
> there's an exception thrown which is not caught by any of the other catch
> blocks, the control goes to the outermost catch block and the WorkThread is
> terminated. However, the connectors/tasks are not aware of this and they
> would keep submitting callbacks to KafkaBasedLog with nobody processing them.
> This can be seen in the thread dumps as well:
>
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s
> tid=0x00007f8d9c037000 nid=0x5d00 waiting on condition [0x00007f8dc08cd000]
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x000000070345c9a8> (a
> java.util.concurrent.CountDownLatch$Sync)
> at
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt([email protected]/AbstractQueuedSynchronizer.java:885)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1039)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1345)
> at
> java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:232)
> at
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
> at
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
> at
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> {code}
>
> We need a mechanism to fail all such offset read requests. That is because
> even if we restart the thread, chances are it will still fail with the same
> error so the offset fetch would be stuck perennially.
> As already explained, this scenario happens mainly when the exception thrown
> is such that it isn't caught by any of the catch blocks and the control lands
> up in the outermost catch block. In my experience, I have seen this situation
> happening on a few occasions, when the exception thrown is:
>
>
> {code:java}
> [2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog
> Work Thread - connect-offsets,5,main]
> (org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
> Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
> at
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)
> at
> org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
>
> at
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
>
> at
> org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75)
>
> at
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake
> failed
> at
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
> ... 4 more
> {code}
> At this point, the WorkThread is dead once the control goes out of the [catch
> block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610]
> and we can find the following line `Unexpected exception in` in the logs.
> Another example could be when the worker is already OOM and in such cases as
> well the work thread would die. This is not a good example because once the
> worker is OOM, we can't make any progress anyways but adding this example for
> brevity's sake.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)