This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.19.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.19.x by this push:
     new 2a8120b  CAMEL-12031: KafkaConsumer stops consuming messages when 
exception occurs during offset commit
2a8120b is described below

commit 2a8120bbe8cab7a7e251857f4cd6564413cf5095
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Dec 18 15:04:08 2017 +0100

    CAMEL-12031: KafkaConsumer stops consuming messages when exception occurs 
during offset commit
---
 .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 24b3bec..179f5ba 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 
@@ -172,6 +173,7 @@ public class KafkaConsumer extends DefaultConsumer {
         protected boolean doRun() {
             // allow to re-connect thread in case we use that to retry failed 
messages
             boolean reConnect = false;
+            boolean unsubscribing = false;
 
             try {
                 log.info("Subscribing {} to topic {}", threadId, topicName);
@@ -292,12 +294,22 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
 
                 log.info("Unsubscribing {} from topic {}", threadId, 
topicName);
+                // we are unsubscribing so do not re connect
+                unsubscribing = true;
                 consumer.unsubscribe();
             } catch (InterruptException e) {
                 getExceptionHandler().handleException("Interrupted while 
consuming " + threadId + " from kafka topic", e);
                 log.info("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
+            } catch (KafkaException e) {
+                // some kind of error in kafka, it may happen during 
unsubscribing or during normal processing
+                if (unsubscribing) {
+                    getExceptionHandler().handleException("Error unsubscribing 
" + threadId + " from kafka topic " + topicName, e);
+                } else {
+                    log.warn("KafkaException consuming {} from topic {}. Will 
attempt to re-connect on next run", threadId, topicName);
+                    reConnect = true;
+                }
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
             } finally {

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <commits@camel.apache.org>'].

Reply via email to