[
https://issues.apache.org/jira/browse/KAFKA-17263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870998#comment-17870998
]
Kuradeon commented on KAFKA-17263:
----------------------------------
I assume these two flags in HeartbeatThread should be volatile:
{code:java}
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
private boolean enabled = false;
private boolean closed = false;
//...
}
{code}
Or there is a little chance that the heartbeat thread wasn't able to read
'closed=true' after it was notified by close() method, and continued to the
next codes.
> Sometimes KafkaConsumer.close will block the thread
> ---------------------------------------------------
>
> Key: KAFKA-17263
> URL: https://issues.apache.org/jira/browse/KAFKA-17263
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.8.2
> Environment: Java 8
> Reporter: Kuradeon
> Priority: Major
>
> Sometimes, the thread will be blocked and never continue (state: waiting)
> after invoke KafkaConsumer.close(). Here are the related thread dump:
> {code:java}
> Thread 73 (pool-5-thread-1):
> State: WAITING
> Blocked count: 25
> Waited count: 22
> Waiting on
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread@565a2927
> Stack:
> java.lang.Object.wait(Native Method)
> java.lang.Thread.join(Thread.java:1257)
> java.lang.Thread.join(Thread.java:1331)
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.closeHeartbeatThread(AbstractCoordinator.java:385)
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1010)
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:926)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2366)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2333)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2283)
>
> com.ebay.msg.numessage.kafka.client.NuMsgConsumer.close(NuMsgConsumer.java:103)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
>
> com.ebay.msg.numessage.kafka.client.NuMsgConsumerDelegate$ThreadSafeDelegate.invoke(NuMsgConsumerDelegate.java:146)
> com.sun.proxy.$Proxy192.close(Unknown Source)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
> Thread 687 (kafka-coordinator-heartbeat-thread |
> numsg-shared-stream-rno.numsg-st-c35):
> State: WAITING
> Blocked count: 0
> Waited count: 1
> Waiting on
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator@51f1e632
> Stack:
> java.lang.Object.wait(Native Method)
> java.lang.Object.wait(Object.java:502)
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1360)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)