[
https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ramkumar updated KAFKA-6043:
----------------------------
Attachment: stdout.LRMIID-158150.zip
Attaching the complete thread dump with this file .
> Kafka 8.1.1 -
> .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> blocked
> ------------------------------------------------------------------------------------------------
>
> Key: KAFKA-6043
> URL: https://issues.apache.org/jira/browse/KAFKA-6043
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.8.1.1
> Reporter: Ramkumar
> Attachments: 6043.v1, stdout.LRMIID-158150.zip
>
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a
> RESTful service which provides http APIs for client. This service maintains
> the consumer connection in cache. And this cache is set to expire in 60
> minutes after which , the consumer connection will get disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object
> is still hanging in jvm. Can you pls let me know if there is any solution
> identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x00007f711804e820 nid=0x6726 waiting for
> monitor entry [0x00007f6cd999b000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x000000076a922c40> (a java.lang.Object)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
> at
> com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x00007f7180198030
> nid=0x55a2 waiting on condition [0x00007f6c9f4fa000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000076b2e1508> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> - locked <0x000000076a922340> (a java.lang.Object)
> at
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
> - locked <0x000000076a922c40> (a java.lang.Object)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)