This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 295d39c CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself. 295d39c is described below commit 295d39c153a013274319f9b66bc1e49fc7c89032 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jun 10 09:40:55 2019 +0200 CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself. --- .../main/java/org/apache/camel/component/kafka/KafkaProducer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 16ba065..c43d824 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -53,6 +53,7 @@ public class KafkaProducer extends DefaultAsyncProducer { private final KafkaEndpoint endpoint; private ExecutorService workerPool; private boolean shutdownWorkerPool; + private volatile boolean closeKafkaProducer; public KafkaProducer(KafkaEndpoint endpoint) { super(endpoint); @@ -106,10 +107,13 @@ public class KafkaProducer extends DefaultAsyncProducer { try { // Kafka uses reflection for loading authentication settings, use its classloader Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); + log.trace("Creating KafkaProducer"); kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props); + closeKafkaProducer = true; } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } + log.debug("Created KafkaProducer: {}", kafkaProducer); } // if we are in asynchronous mode we need a worker pool @@ -122,8 +126,10 @@ public class KafkaProducer extends DefaultAsyncProducer { @Override protected void doStop() throws Exception { - if (kafkaProducer != null) { + if (kafkaProducer != null && closeKafkaProducer) { + log.debug("Closing KafkaProducer: {}", kafkaProducer); kafkaProducer.close(); + kafkaProducer = null; } if (shutdownWorkerPool && workerPool != null) {