CAMEL-8756: camel-kafka consumer should not poll messages if being suspended.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/14efed0c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/14efed0c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/14efed0c

Branch: refs/heads/master
Commit: 14efed0c9fc44659883a15212fa6750ee5404475
Parents: 8593f26
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Aug 7 16:21:38 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Aug 7 16:21:38 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/support/ServiceSupport.java    |  4 +++
 .../camel/component/kafka/KafkaConsumer.java    | 26 +++++++++-----------
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/14efed0c/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java 
b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
index 976db03..6cf6105 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
@@ -247,6 +247,10 @@ public abstract class ServiceSupport implements 
StatefulService {
         return stopping.get() || stopped.get();
     }
 
+    public boolean isSuspendingOrSuspended() {
+        return suspending.get() || suspended.get();
+    }
+
     /**
      * Implementations override this method to support customized start/stop.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/14efed0c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 46d258d..1657fa7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -20,11 +20,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -134,14 +132,14 @@ public class KafkaConsumer extends DefaultConsumer {
 
             int processed = 0;
             boolean consumerTimeout;
-            MessageAndMetadata<byte[], byte[]> mm = null;
+            MessageAndMetadata<byte[], byte[]> mm;
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
             boolean hasNext = true;
             while (hasNext) {
-
                 try {
                     consumerTimeout = false;
-                    if (it.hasNext()) {
+                    // only poll the next message if we are allowed to run and 
are not suspending
+                    if (isRunAllowed() && !isSuspendingOrSuspended() && 
it.hasNext()) {
                         mm = it.next();
                         Exchange exchange = endpoint.createKafkaExchange(mm);
                         try {
@@ -155,7 +153,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         hasNext = false;
                     }
                 } catch (ConsumerTimeoutException e) {
-                    LOG.debug(e.getMessage(), e);
+                    LOG.debug("Consumer timeout occurred due " + 
e.getMessage(), e);
                     consumerTimeout = true;
                 }
 
@@ -166,14 +164,9 @@ public class KafkaConsumer extends DefaultConsumer {
                         if (!consumerTimeout) {
                             processed = 0;
                         }
-                    } catch (InterruptedException e) {
-                        LOG.error(e.getMessage(), e);
-                        break;
-                    } catch (BrokenBarrierException e) {
-                        LOG.error(e.getMessage(), e);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error waiting 
for batch to complete", e);
                         break;
-                    } catch (TimeoutException e) {
-                        LOG.error(e.getMessage(), e);
                     }
                 }
             }
@@ -203,12 +196,15 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         public void run() {
-            for (MessageAndMetadata<byte[], byte[]> mm : stream) {
+            ConsumerIterator<byte[], byte[]> it = stream.iterator();
+            // only poll the next message if we are allowed to run and are not 
suspending
+            while (isRunAllowed() && it.hasNext()) {
+                MessageAndMetadata<byte[], byte[]> mm = it.next();
                 Exchange exchange = endpoint.createKafkaExchange(mm);
                 try {
                     processor.process(exchange);
                 } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
+                    getExceptionHandler().handleException("Error during 
processing", exchange, e);
                 }
             }
         }

Reply via email to