Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 9fb6787c2 -> 914850846


CAMEL-9812: Camel leaves Kafka consumers running after shutdown


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/91485084
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/91485084
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/91485084

Branch: refs/heads/camel-2.17.x
Commit: 914850846263e0a1f4d6d33eea0ccf4212c2eb4d
Parents: 9fb6787
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Tue Apr 5 11:02:03 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Tue Apr 5 11:17:48 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/component/kafka/KafkaConsumer.java   | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/91485084/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e65ed3b..ab3f9f2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.impl.DefaultConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,8 +128,17 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();
+                LOG.debug("Closing {} ", threadId);
+                consumer.close();
+            } catch (InterruptException e) {
+                getExceptionHandler().handleException("Interrupted while 
consuming " + threadId + " from kafka topic", e);
+                consumer.unsubscribe();
+                Thread.currentThread().interrupt();
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
+            } finally {
+                LOG.debug("Closing {} ", threadId);
+                consumer.close();
             }
         }
 

Reply via email to