Repository: camel Updated Branches: refs/heads/master 75b424d0d -> d90a3f9d8
CAMEL-9812: Camel leaves Kafka consumers running after shutdown Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d90a3f9d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d90a3f9d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d90a3f9d Branch: refs/heads/master Commit: d90a3f9d89f7ee4e12efad65a099eef3ef2e532e Parents: 75b424d Author: Andrea Cosentino <anco...@gmail.com> Authored: Tue Apr 5 11:02:03 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Tue Apr 5 11:04:15 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d90a3f9d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index a2f2d5b..ad02258 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -135,12 +135,17 @@ public class KafkaConsumer extends DefaultConsumer { } LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); + LOG.debug("Closing {} ", threadId); + consumer.close(); } catch (InterruptException e) { getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); consumer.unsubscribe(); Thread.currentThread().interrupt(); } catch (Exception e) { getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); + } finally { + LOG.debug("Closing {} ", threadId); + consumer.close(); } }