This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.20.x by this push: new 20be7d5 CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer. 20be7d5 is described below commit 20be7d5de76cc301ea05a7e3e4ef7204cddfa522 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jan 3 11:52:42 2018 +0100 CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer. --- .../camel/component/kafka/KafkaConsumer.java | 34 +++++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 52611fc..d9de379 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -101,6 +101,8 @@ public class KafkaConsumer extends DefaultConsumer { executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) { KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()); + // pre-initialize task during startup so if there is any error we have it thrown asap + task.preInit(); executor.submit(task); tasks.add(task); } @@ -146,15 +148,14 @@ public class KafkaConsumer extends DefaultConsumer { boolean reConnect = true; while (reConnect) { - - // create consumer - ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); try { - // Kafka uses reflection for loading authentication settings, use its classloader - Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); - this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); - } finally { - Thread.currentThread().setContextClassLoader(threadClassLoader); + if (!first) { + // re-initialize on re-connect so we have a fresh consumer + doInit(); + } + } catch (Throwable e) { + // ensure this is logged so users can see the problem + log.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due " + e.getMessage(), e); } if (!first) { @@ -175,6 +176,23 @@ public class KafkaConsumer extends DefaultConsumer { } } + void preInit() { + doInit(); + } + + protected void doInit() { + // create consumer + ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); + try { + // Kafka uses reflection for loading authentication settings, use its classloader + Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); + // this may throw an exception if something is wrong with kafka consumer + this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); + } finally { + Thread.currentThread().setContextClassLoader(threadClassLoader); + } + } + @SuppressWarnings("unchecked") protected boolean doRun() { // allow to re-connect thread in case we use that to retry failed messages -- To stop receiving notification emails like this one, please contact ['"commits@camel.apache.org" <commits@camel.apache.org>'].