[
https://issues.apache.org/jira/browse/KAFKA-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16858771#comment-16858771
]
Gaurav commented on KAFKA-8506:
-------------------------------
Properties props = {color:#000080}new {color}Properties();
props.put(ConsumerConfig.{color:#660e7a}BOOTSTRAP_SERVERS_CONFIG{color},
{color:#008000}"localhost:9092"{color});
props.put(ConsumerConfig.{color:#660e7a}VALUE_DESERIALIZER_CLASS_CONFIG{color},
{color:#008000}"org.apache.kafka.common.serialization.StringDeserializer"{color});
props.put(ConsumerConfig.{color:#660e7a}KEY_DESERIALIZER_CLASS_CONFIG{color},
{color:#008000}"org.apache.kafka.common.serialization.StringDeserializer"{color});
props.put(ConsumerConfig.{color:#660e7a}GROUP_ID_CONFIG{color},
{color:#008000}"test-consumer-group"{color});
props.put(ConsumerConfig.{color:#660e7a}REQUEST_TIMEOUT_MS_CONFIG{color},
{color:#0000ff}5000{color});
props.put(ConsumerConfig.{color:#660e7a}RETRY_BACKOFF_MS_CONFIG{color},
{color:#0000ff}0{color});
{color:#660e7a}consumer {color}= {color:#000080}new
{color}KafkaConsumer(props);
{color:#660e7a}consumer{color}.subscribe(Arrays.asList({color:#008000}"odd"{color},
{color:#008000}"even"{color}));
{color:#000080}long {color}startTinme = System.currentTimeMillis();
{color:#000080}try {color}{
{color:#000080}while {color}({color:#000080}true{color}) {
ConsumerRecords<String, String> recs =
{color:#660e7a}consumer{color}.poll({color:#0000ff}1000{color});
{color:#000080}if {color}(recs == {color:#000080}null {color}|| recs.isEmpty())
{
{color:#000080}return {color}{color:#008000}"Success"{color};
}
{color:#000080}for {color}(ConsumerRecord<String, String> rec : recs) {
System.{color:#660e7a}out{color}.printf({color:#008000}"Recieved %s:
%s"{color}, rec.key(), rec.value());
}
}
} {color:#000080}catch {color}(Exception e) {
e.printStackTrace();
} {color:#000080}finally {color}{
System.{color:#660e7a}out{color}.println(System.currentTimeMillis() -
startTinme);
}
{color:#000080}return {color}{color:#008000}"Failure"{color};
}
> Kafka Consumer broker never stops on connection failure
> -------------------------------------------------------
>
> Key: KAFKA-8506
> URL: https://issues.apache.org/jira/browse/KAFKA-8506
> Project: Kafka
> Issue Type: Bug
> Reporter: Gaurav
> Priority: Critical
>
> not able to stop the Kafka broker on connection failure it keeps retrying to
> create connection on calling consumer.poll(1000). We poll ondemand via rest
> if case of connection failure it should stop and throw exception.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)