CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no 
longer running such as suspending/shutting down.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/805db35b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/805db35b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/805db35b

Branch: refs/heads/master
Commit: 805db35b482b0f3b65df80906e1d42dcea1b3c9e
Parents: e09c51d
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Aug 7 16:56:36 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Aug 7 16:56:36 2015 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 24 ++++++++++++++++----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/805db35b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index cb2dc9d..94893fb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import kafka.message.MessageAndMetadata;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +73,7 @@ public class KafkaConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         log.info("Starting Kafka consumer");
+
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConsumersCount(); i++) {
             ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(new 
ConsumerConfig(getProps()));
@@ -79,8 +81,10 @@ public class KafkaConsumer extends DefaultConsumer {
             topicCountMap.put(endpoint.getTopic(), 
endpoint.getConsumerStreams());
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
             List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(endpoint.getTopic());
+
+            // commit periodically
             if (endpoint.isAutoCommitEnable() != null && 
!endpoint.isAutoCommitEnable()) {
-                if ((endpoint.getConsumerTimeoutMs() == null || 
endpoint.getConsumerTimeoutMs().intValue() < 0)
+                if ((endpoint.getConsumerTimeoutMs() == null || 
endpoint.getConsumerTimeoutMs() < 0)
                         && endpoint.getConsumerStreams() > 1) {
                     LOG.warn("consumerTimeoutMs is set to -1 (infinite) while 
requested multiple consumer streams.");
                 }
@@ -90,8 +94,9 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 consumerBarriers.put(consumer, barrier);
             } else {
+                // auto commit
                 for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new AutoCommitConsumerTask(stream));
+                    executor.submit(new AutoCommitConsumerTask(consumer, 
stream));
                 }
                 consumerBarriers.put(consumer, null);
             }
@@ -103,11 +108,14 @@ public class KafkaConsumer extends DefaultConsumer {
     protected void doStop() throws Exception {
         super.doStop();
         log.info("Stopping Kafka consumer");
+
         for (ConsumerConnector consumer : consumerBarriers.keySet()) {
             if (consumer != null) {
                 consumer.shutdown();
             }
         }
+        consumerBarriers.clear();
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
                 
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -175,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     class CommitOffsetTask implements Runnable {
 
-        private ConsumerConnector consumer;
+        private final ConsumerConnector consumer;
 
         public CommitOffsetTask(ConsumerConnector consumer) {
             this.consumer = consumer;
@@ -183,22 +191,25 @@ public class KafkaConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
+            LOG.debug("Commit offsets on consumer: {}", 
ObjectHelper.getIdentityHashCode(consumer));
             consumer.commitOffsets();
         }
     }
 
     class AutoCommitConsumerTask implements Runnable {
 
+        private final ConsumerConnector consumer;
         private KafkaStream<byte[], byte[]> stream;
 
-        public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
+        public AutoCommitConsumerTask(ConsumerConnector consumer, 
KafkaStream<byte[], byte[]> stream) {
+            this.consumer = consumer;
             this.stream = stream;
         }
 
         public void run() {
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
             // only poll the next message if we are allowed to run and are not 
suspending
-            while (isRunAllowed() && it.hasNext()) {
+            while (isRunAllowed() && !isSuspendingOrSuspended() && 
it.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> mm = it.next();
                 Exchange exchange = endpoint.createKafkaExchange(mm);
                 try {
@@ -207,6 +218,9 @@ public class KafkaConsumer extends DefaultConsumer {
                     getExceptionHandler().handleException("Error during 
processing", exchange, e);
                 }
             }
+            // no more data so commit offset
+            LOG.debug("Commit offsets on consumer: {}", 
ObjectHelper.getIdentityHashCode(consumer));
+            consumer.commitOffsets();
         }
     }
 }

Reply via email to