Shengnan YU created KAFKA-8100:
----------------------------------
Summary: If delete expired topic, kafka consumer will keep
flushing unknown_topic warning in log
Key: KAFKA-8100
URL: https://issues.apache.org/jira/browse/KAFKA-8100
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.1.1, 1.1.1
Reporter: Shengnan YU
Recently we used flink to consume kafka topics with a regex pattern. It is
found that when we deleted some unused topics, the logs will keep flushing
UNKNOWN_TOPIC_EXCEPTION.
I study the source code of kafka client, it is found that for consumer,
topicExpiry is disable in Metadata, which leads to that the even the topic
deleted, the client still have this topic info in the metadata's topic list and
keep fetching from servers.
Is there any good method to avoid this annoying warning logs without modify the
kafka's source code? (We still need the 'Real' Unknown topic exception, which
means not the outdated topic, in logs)
The following code can be used to reproduce this problem (if you create
multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster
and then delete any of one while running).
{code:java}
public static void main(String [] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092\n");
props.put("group.id", "test10");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("metadata.max.age.ms", "60000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
class PartitionOffsetAssignerListener implements
ConsumerRebalanceListener {
private KafkaConsumer<String, String> consumer;
public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer)
{
this.consumer = kafkaConsumer;
}
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
}
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
//reading all partitions from the beginning
consumer.seekToBeginning(partitions);
}
}
consumer.subscribe(Pattern.compile("^test.*$"), new
PartitionOffsetAssignerListener(consumer));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)