This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.20.x by this push:
     new 20be7d5  CAMEL-12110: camel-kafka consumer swallows exception if error 
creating KafkaConsumer.
20be7d5 is described below

commit 20be7d5de76cc301ea05a7e3e4ef7204cddfa522
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jan 3 11:52:42 2018 +0100

    CAMEL-12110: camel-kafka consumer swallows exception if error creating 
KafkaConsumer.
---
 .../camel/component/kafka/KafkaConsumer.java       | 34 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 52611fc..d9de379 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -101,6 +101,8 @@ public class KafkaConsumer extends DefaultConsumer {
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); 
i++) {
             KafkaFetchRecords task = new 
KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps());
+            // pre-initialize task during startup so if there is any error we 
have it thrown asap
+            task.preInit();
             executor.submit(task);
             tasks.add(task);
         }
@@ -146,15 +148,14 @@ public class KafkaConsumer extends DefaultConsumer {
             boolean reConnect = true;
 
             while (reConnect) {
-
-                // create consumer
-                ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
                 try {
-                    // Kafka uses reflection for loading authentication 
settings, use its classloader
-                    
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
-                    this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
-                } finally {
-                    
Thread.currentThread().setContextClassLoader(threadClassLoader);
+                    if (!first) {
+                        // re-initialize on re-connect so we have a fresh 
consumer
+                        doInit();
+                    }
+                } catch (Throwable e) {
+                    // ensure this is logged so users can see the problem
+                    log.warn("Error creating 
org.apache.kafka.clients.consumer.KafkaConsumer due " + e.getMessage(), e);
                 }
 
                 if (!first) {
@@ -175,6 +176,23 @@ public class KafkaConsumer extends DefaultConsumer {
             }
         }
 
+        void preInit() {
+            doInit();
+        }
+
+        protected void doInit() {
+            // create consumer
+            ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
+            try {
+                // Kafka uses reflection for loading authentication settings, 
use its classloader
+                
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+                // this may throw an exception if something is wrong with 
kafka consumer
+                this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+            } finally {
+                
Thread.currentThread().setContextClassLoader(threadClassLoader);
+            }
+        }
+
         @SuppressWarnings("unchecked")
         protected boolean doRun() {
             // allow to re-connect thread in case we use that to retry failed 
messages

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <commits@camel.apache.org>'].

Reply via email to