Hi Arvid,
Thanks for your reply.
Yes, the warning is throwed by Kafka-clients. Here is the warning log after I
deleted the topic that Kafka consumer is listening to.
18:46:27,297 WARN org.apache.kafka.clients.NetworkClient - [Consumer
clientId=consumer-2, groupId=osstest] Error while fetching metadata with
correlation id 30288 : {oss-consumer-test-1=UNKNOWN_TOPIC_OR_PARTITION,
oss-consumer-test-2=UNKNOWN_TOPIC_OR_PARTITION}
I have a follow up question.
“What you could do is try to config Kafka consumer to fail hard when topic
metadata cannot be retrieved with a small timeout.”
May I ask how to config Flink Kafka Consumer to fail when one of the topic
metadata cannot be retrieved?
For example, a FlinkKafkaConsumer is listening to 3 different Kafka topics. If
one of the Kafka topics is deleted and FlinkKafkaConsumer cannot retrive the
corresponding topic metadata. We hope that FlinkKafkaConsumer will fail hard
and stop retrieving other two live topics.
Thank you very much!
Thanks, Yan
From: Arvid Heise <[email protected]>
Date: Thursday, September 2, 2021 at 1:27 PM
To: Yan Wang <[email protected]>
Cc: [email protected] <[email protected]>
Subject: [External] : Re: Use FlinkKafkaConsumer to synchronize multiple Kafka
topics
Hi Yan,
Afaik this is not directly supported and would be surprising to other users
since it's a rather specific requirement.
In fact, Flink delegates reading the topics to Kafka consumer API and I suspect
that the warning you received is also coming from Kafka consumer (I have not
found a respective warning in Flink's code base but you could also show the
exact log statement so I can recheck).
What you could do is try to config Kafka consumer to fail hard when topic
metadata cannot be retrieved with a small timeout.
Note that I'm a bit confused by the terms "dead" topic and "rebooted" topic.
Afaik you can only have dead brokers and rebooted brokers and maybe deleted
topics. But I have yet to understand a use case where you would delete a topic
while the consumer is running.
On Thu, Sep 2, 2021 at 4:58 AM Yan Wang
<[email protected]<mailto:[email protected]>> wrote:
Hi,
We are currently using a single FlinkKafkaConsumer to consume multiple Kafka
topics, however, we find that if one of the Kafka topics goes down at run
time(like rebooting one of the topics), the FlinkKafkaConsumer will keep
throwing warning message of the dead Kafka topic, and will also continue
consume other live Kafka topics.
However, what we want is that, if one of the topics goes down, the
FlinkKafkaConsumer will wait and stop consuming other live topics until the
dead topic goes live.
Code example:
List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList( “KafkaTopic1”,
“KafkaTopic2” ) );
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(
kafkaTopicsList, new SimpleStringSchema(), properties);
As shown in the code example, kafkaTopicsList contains two Kafka topics, and
flinkKafkaConsumer consumes both two topics. We hope that if KafkaTopic1 goes
down at run-time(we may reboot KafkaTopic1 at run time), the flinkKafkaConsumer
will wait and stop consuming KafkaTopic2, until KafkaTopic1 goes live again.
May I ask is it possible to achieve this purpose using current Flink API? Do we
need to edit configuration somewhere? Or we have to overwrite
FlinkKafkaConsumer Class to achieve this? Thank you very much!
Thanks, Yan