Repository: camel Updated Branches: refs/heads/camel-2.17.x 9fb6787c2 -> 914850846
CAMEL-9812: Camel leaves Kafka consumers running after shutdown Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/91485084 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/91485084 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/91485084 Branch: refs/heads/camel-2.17.x Commit: 914850846263e0a1f4d6d33eea0ccf4212c2eb4d Parents: 9fb6787 Author: Andrea Cosentino <anco...@gmail.com> Authored: Tue Apr 5 11:02:03 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Tue Apr 5 11:17:48 2016 +0200 ---------------------------------------------------------------------- .../org/apache/camel/component/kafka/KafkaConsumer.java | 10 ++++++++++ 1 file changed, 10 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/91485084/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index e65ed3b..ab3f9f2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -26,6 +26,7 @@ import org.apache.camel.impl.DefaultConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.InterruptException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,8 +128,17 @@ public class KafkaConsumer extends DefaultConsumer { } LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); + LOG.debug("Closing {} ", threadId); + consumer.close(); + } catch (InterruptException e) { + getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); + consumer.unsubscribe(); + Thread.currentThread().interrupt(); } catch (Exception e) { getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); + } finally { + LOG.debug("Closing {} ", threadId); + consumer.close(); } }