[
https://issues.apache.org/jira/browse/KAFKA-20232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-20232:
------------------------------
Component/s: clients
consumer
> KafkaConsumer#commitAsync throws unexpected WakeupException in
> awaitMetadataUpdate()
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-20232
> URL: https://issues.apache.org/jira/browse/KAFKA-20232
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Brian Sang
> Priority: Minor
>
> This is basically the same problem reported in
> https://issues.apache.org/jira/browse/KAFKA-14208, but it occurs in a
> slightly different code path:
> {code:java}
> Caused by: org.apache.kafka.common.errors.WakeupException
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:277)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReadyAsync(AbstractCoordinator.java:251)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnreadyAsync(ConsumerCoordinator.java:501)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1080)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1574)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.notifyCheckpointComplete(KafkaPartitionSplitReader.java:258)
> at
> org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager$1.run(KafkaSourceFetcherManager.java:104)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 6 more {code}
> i.e. starting from
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L309]
> The fix before was to use a new poll() invocation with a disableWakeup flag,
> and I think a similar fix would work here. I will create a PR and attach it
> to this ticket.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)