[
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021795#comment-17021795
]
Jun Rao commented on KAFKA-8532:
--------------------------------
[~lbdai3190] : To verify this, I wrote the following short program. I created
an instance of KafkaZkClient and registered a StateChangeHandler that blocks in
beforeInitializingSession().
{code:java}
package kafka.tools
import kafka.zk.KafkaZkClient
import kafka.zookeeper.StateChangeHandler
import org.apache.kafka.common.utils.Time
object Mytest {
// visible for testing
private[tools] val RecordIndent = "|"
def main(args: Array[String]): Unit = {
val kakfaZkClient = KafkaZkClient("localhost:2181", false, 6000, 6000, 10,
Time.SYSTEM, "mytest")
kakfaZkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = "mytest"
override def afterInitializingSession(): Unit = {
throw new IllegalStateException
}
override def beforeInitializingSession(): Unit = {
Thread.sleep(Integer.MAX_VALUE) //block forever
}
})
println("zookeeper client state: " +
kakfaZkClient.currentZooKeeper.getState)
Thread.sleep(20000)
try {
val children = kakfaZkClient.getChildren("/")
println("child nodes are " + children)
} catch {
case t: Throwable =>
println("hit exception " + t)
}
println("zookeeper client state: " +
kakfaZkClient.currentZooKeeper.getState)
}
}{code}
I then started zookeeper server, and ran the above program
(bin/kafka-run-class.sh kafka.tools.Mytest). I waited until the zookeeper
client got to the CONNECTED state (but before the 20 sec sleep completes) and
did "kill -STOP" to pause the program. I waited for another 6 seconds for the
ZK session to expire. Then I did "kill -CONT" to resume the program. The
following is the output that I got.
{code:java}
zookeeper client state: CONNECTED
[2020-01-22 21:47:01,693] WARN Client session timed out, have not heard from
server in 17038ms for sessionid 0x1002ced93e60000
(org.apache.zookeeper.ClientCnxn)
[2020-01-22 21:47:03,351] WARN Unable to reconnect to ZooKeeper service,
session 0x1002ced93e60000 has expired (org.apache.zookeeper.ClientCnxn)
hit exception org.apache.zookeeper.KeeperException$SessionExpiredException:
KeeperErrorCode = Session expired for /
zookeeper client state: CLOSED
{code}
As you can see, this program verifies a few things. (1) If a ZK session
expires, the state of ZK client transitions to CLOSED (not CONNECTING). (2) If
ZooKeeperClient.handleRequests() is called after the ZK session has expired,
the call doesn't block and returns a Session expired error code. (3) Even if
beforeInitializingSession() blocks, ZooKeeperClient.handleRequests() is not
blocked on an expired session.
> controller-event-thread deadlock with zk-session-expiry-handler0
> ----------------------------------------------------------------
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 2.1.1
> Reporter: leibo
> Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log,
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and
> zk-session-expirey-handle thread. When this issue occurred, it's only one way
> to recovery the kafka cluster is restart kafka server. The follows is the
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0
> tid=0x00007fcc9c010000 nid=0xfb22 waiting on condition [0x00007fcbb01f8000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000005ee3f7000> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) //
> 等待controller-event-thread线程处理expireEvent
> at
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
> at
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
> at
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
> at
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
> at
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
> at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown
> Source)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
> at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
> at
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
> at
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown
> Source)
> at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown
> Source)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
> - <0x0000000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x00007fceaeec4000
> nid=0x310 waiting on condition [0x00007fccb55c8000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000005d1be5a00> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
> at
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
> at
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1589)
> at
> kafka.zk.KafkaZkClient.deletePreferredReplicaElection(KafkaZkClient.scala:989)
> at
> kafka.controller.KafkaController.removePartitionsFromPreferredReplicaElection(KafkaController.scala:873)
> at
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:631)
> at
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:266)
> at
> kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1221)
> at
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1508)
> at
> kafka.controller.KafkaController$RegisterBrokerAndReelect$.process(KafkaController.scala:1517)
> at
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:89)
> at
> kafka.controller.ControllerEventManager$ControllerEventThread$$Lambda$362/918424570.apply$mcV$sp(Unknown
> Source)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:89)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Locked ownable synchronizers:
>
> this issue is same to https://issues.apache.org/jira/browse/KAFKA-6879 , I
> doubt that KAFKA-6879 was not been fixed complete , please be attention to
> this issue
> - None
--
This message was sent by Atlassian Jira
(v8.3.4#803005)