Repository: camel
Updated Branches:
  refs/heads/master a0ae974f9 -> 95488a5a6


CAMEL-8636 Committed the last batch of message when  the auto commit is false


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/95488a5a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/95488a5a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/95488a5a

Branch: refs/heads/master
Commit: 95488a5a65f7dcb03a87adfe605f27e3ec93f736
Parents: a0ae974
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Wed Apr 15 14:39:41 2015 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Wed Apr 15 18:57:04 2015 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/95488a5a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index d6b49d2..46d258d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -136,29 +136,31 @@ public class KafkaConsumer extends DefaultConsumer {
             boolean consumerTimeout;
             MessageAndMetadata<byte[], byte[]> mm = null;
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
-
-            while (true) {
+            boolean hasNext = true;
+            while (hasNext) {
 
                 try {
                     consumerTimeout = false;
                     if (it.hasNext()) {
                         mm = it.next();
+                        Exchange exchange = endpoint.createKafkaExchange(mm);
+                        try {
+                            processor.process(exchange);
+                        } catch (Exception e) {
+                            LOG.error(e.getMessage(), e);
+                        }
+                        processed++;
                     } else {
-                        break;
-                    }
-                    Exchange exchange = endpoint.createKafkaExchange(mm);
-                    try {
-                        processor.process(exchange);
-                    } catch (Exception e) {
-                        LOG.error(e.getMessage(), e);
+                        // we don't need to process the message
+                        hasNext = false;
                     }
-                    processed++;
                 } catch (ConsumerTimeoutException e) {
                     LOG.debug(e.getMessage(), e);
                     consumerTimeout = true;
                 }
 
-                if (processed >= endpoint.getBatchSize() || consumerTimeout) {
+                if (processed >= endpoint.getBatchSize() || consumerTimeout 
+                    || (processed > 0 && !hasNext)) { // Need to commit the 
offset for the last round
                     try {
                         berrier.await(endpoint.getBarrierAwaitTimeoutMs(), 
TimeUnit.MILLISECONDS);
                         if (!consumerTimeout) {

Reply via email to