[
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681132#comment-16681132
]
sachin commented on KAFKA-7280:
-------------------------------
{code:java}
{code}
[~rsivaram]
* I am on "1.1.0" version and following is the stack trace. Is this fatal
error? Or upon subsequent poll() or commitSync(), will heartbeat thread
restart?:
{code:java}
2018-11-08 16:53:57,456 [kafka-coordinator-heartbeat-thread |
ConsumerNginxRawCW] ERROR
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=ConsumerNginxRawCW] Heartbeat thread failed due to
unexpected error
java.util.ConcurrentModificationException
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734)
at
org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
at
org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:423)
at
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:212)
at
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
* I have not enabled TRACE level logging. Following is the log4j2 config:
*
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" monitorInterval="60">
<ThresholdFilter level="trace" />
<Appenders>
<RollingFile name="Appender" fileName="/var/log/worker-nginx-raw.log"
filePattern="/var/log/worker-nginx-raw-%d{yyyy-MM-dd}-%i.log.gz">
<PatternLayout>
<pattern>%d [%t] %-5level %logger{36} - %msg%n</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB" />
<TimeBasedTriggeringPolicy interval="1" />
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Logger name="com.kafka-client" additivity="false">
<AppenderRef ref="Appender" level="info" />
</Logger>
<Root level="all">
<AppenderRef ref="Appender" level="error" />
</Root>
</Loggers>
</Configuration>
{code}
* Can any workaround be done to avoid this problem? Like listen for above
exception from heartbeat thread and wakeup the thread doing the actual
poll/commit then spawn another thread that reinitiates eveything including
subscribe(...) etc?
> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --------------------------------------------------------------------------
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 1.1.1, 2.0.0
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we
> are using it in Kafka consumer without any synchronization even though poll()
> from heartbeat thread can process responses. Heartbeat thread holds the
> coordinator lock while processing its poll and responses, making other
> operations involving the group coordinator safe. We also need to lock
> FetchSessionHandler for the operations that update or read
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer,
> groupId=group] Heartbeat thread failed due to unexpected error
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> java.util.ConcurrentModificationException
> at
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
> at
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>
> The logs just prior to the exception show that a partition was removed from
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer,
> groupId=group] Skipping fetch for partition test_topic-1 because there is an
> in-flight request to worker4:9095 (id: 3 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer,
> groupId=group] Completed receive from node 2 for FETCH with correlation id
> 417, received
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro:
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
> [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer,
> groupId=group] Added READ_UNCOMMITTED fetch request for partition
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer,
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0)
> (org.apache.kafka.clients.FetchSessionHandler)
> [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer,
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(),
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id:
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer,
> groupId=group] Sending FETCH
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[
> Unknown macro: \{topic=test_topic,partitions=[2]}
> ]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient)
> [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer,
> groupId=group] Skipping fetch for partition test_topic-2 because there is an
> in-flight request to worker3:9095 (id: 2 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer,
> groupId=group] Heartbeat thread failed due to unexpected error
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> java.util.ConcurrentModificationException
> {quote}
> The sequence in the logs show
> # FETCH response received
> # FetchSessionHandler#sessionPartitions is updated (a partition is removed)
> # New FETCH request is sent
> # Heartbeat thread throws ConcurrentModificationException while iterating
> over FetchSessionHandler#sessionPartitions
> This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on
> the thread processing Consumer#poll().
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)