[ 
https://issues.apache.org/jira/browse/KAFKA-20232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-20232:
------------------------------
    Component/s: clients
                 consumer

> KafkaConsumer#commitAsync throws unexpected WakeupException in 
> awaitMetadataUpdate()
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20232
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20232
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Brian Sang
>            Priority: Minor
>
> This is basically the same problem reported in 
> https://issues.apache.org/jira/browse/KAFKA-14208, but it occurs in a 
> slightly different code path:
> {code:java}
> Caused by: org.apache.kafka.common.errors.WakeupException
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:277)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReadyAsync(AbstractCoordinator.java:251)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnreadyAsync(ConsumerCoordinator.java:501)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1080)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1574)
>       at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.notifyCheckpointComplete(KafkaPartitionSplitReader.java:258)
>       at 
> org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager$1.run(KafkaSourceFetcherManager.java:104)
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>       ... 6 more {code}
> i.e. starting from 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L309]
> The fix before was to use a new poll() invocation with a disableWakeup flag, 
> and I think a similar fix would work here. I will create a PR and attach it 
> to this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to