[
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817951#comment-16817951
]
Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM:
-------------------------------------------------------------
Hi could you please explain why not delete this topic info in metadata first
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered
again if the topic actually exists.
was (Author: ysn2233):
Hi could you please explain why not delete this topic info in metadata first
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if
the topic actually exists.
> If delete expired topic, kafka consumer will keep flushing unknown_topic
> warning in log
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 1.1.1, 2.1.1
> Reporter: Shengnan YU
> Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is
> found that when we deleted some unused topics, the logs will keep flushing
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer,
> topicExpiry is disable in Metadata, which leads to that the even the topic
> deleted, the client still have this topic info in the metadata's topic list
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify
> the kafka's source code? (We still need the 'Real' Unknown topic exception,
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "60000");
> KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);
> class PartitionOffsetAssignerListener implements
> ConsumerRebalanceListener {
> private KafkaConsumer<String, String> consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection<TopicPartition>
> partitions) {
> }
> public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords<String, String> records = consumer.poll(100);
> for (ConsumerRecord<String, String> record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n",
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)