This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.19.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.19.x by this push: new 2a8120b CAMEL-12031: KafkaConsumer stops consuming messages when exception occurs during offset commit 2a8120b is described below commit 2a8120bbe8cab7a7e251857f4cd6564413cf5095 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Dec 18 15:04:08 2017 +0100 CAMEL-12031: KafkaConsumer stops consuming messages when exception occurs during offset commit --- .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 24b3bec..179f5ba 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; @@ -172,6 +173,7 @@ public class KafkaConsumer extends DefaultConsumer { protected boolean doRun() { // allow to re-connect thread in case we use that to retry failed messages boolean reConnect = false; + boolean unsubscribing = false; try { log.info("Subscribing {} to topic {}", threadId, topicName); @@ -292,12 +294,22 @@ public class KafkaConsumer extends DefaultConsumer { } log.info("Unsubscribing {} from topic {}", threadId, topicName); + // we are unsubscribing so do not re connect + unsubscribing = true; consumer.unsubscribe(); } catch (InterruptException e) { getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); log.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); Thread.currentThread().interrupt(); + } catch (KafkaException e) { + // some kind of error in kafka, it may happen during unsubscribing or during normal processing + if (unsubscribing) { + getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e); + } else { + log.warn("KafkaException consuming {} from topic {}. Will attempt to re-connect on next run", threadId, topicName); + reConnect = true; + } } catch (Exception e) { getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); } finally { -- To stop receiving notification emails like this one, please contact ['"commits@camel.apache.org" <commits@camel.apache.org>'].