This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.20.x by this push: new 4baceea CAMEL-12031: KafkaConsumer stops consuming messages when exception occurs during offset commit 4baceea is described below commit 4baceea76c9cf794a4bd65f1bd6568e65badd11b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Dec 18 15:04:08 2017 +0100 CAMEL-12031: KafkaConsumer stops consuming messages when exception occurs during offset commit --- .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 19c9884..52611fc 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; @@ -178,6 +179,7 @@ public class KafkaConsumer extends DefaultConsumer { protected boolean doRun() { // allow to re-connect thread in case we use that to retry failed messages boolean reConnect = false; + boolean unsubscribing = false; try { log.info("Subscribing {} to topic {}", threadId, topicName); @@ -298,12 +300,22 @@ public class KafkaConsumer extends DefaultConsumer { } log.info("Unsubscribing {} from topic {}", threadId, topicName); + // we are unsubscribing so do not re connect + unsubscribing = true; consumer.unsubscribe(); } catch (InterruptException e) { getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); log.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); Thread.currentThread().interrupt(); + } catch (KafkaException e) { + // some kind of error in kafka, it may happen during unsubscribing or during normal processing + if (unsubscribing) { + getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e); + } else { + log.warn("KafkaException consuming {} from topic {}. Will attempt to re-connect on next run", threadId, topicName); + reConnect = true; + } } catch (Exception e) { getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); } finally { -- To stop receiving notification emails like this one, please contact ['"commits@camel.apache.org" <commits@camel.apache.org>'].