CAMEL-8756: camel-kafka consumer should not poll messages if being suspended.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/14efed0c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/14efed0c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/14efed0c Branch: refs/heads/master Commit: 14efed0c9fc44659883a15212fa6750ee5404475 Parents: 8593f26 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 7 16:21:38 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 7 16:21:38 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/support/ServiceSupport.java | 4 +++ .../camel/component/kafka/KafkaConsumer.java | 26 +++++++++----------- 2 files changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/14efed0c/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java index 976db03..6cf6105 100644 --- a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java @@ -247,6 +247,10 @@ public abstract class ServiceSupport implements StatefulService { return stopping.get() || stopped.get(); } + public boolean isSuspendingOrSuspended() { + return suspending.get() || suspended.get(); + } + /** * Implementations override this method to support customized start/stop. * <p/> http://git-wip-us.apache.org/repos/asf/camel/blob/14efed0c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 46d258d..1657fa7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -20,11 +20,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; @@ -134,14 +132,14 @@ public class KafkaConsumer extends DefaultConsumer { int processed = 0; boolean consumerTimeout; - MessageAndMetadata<byte[], byte[]> mm = null; + MessageAndMetadata<byte[], byte[]> mm; ConsumerIterator<byte[], byte[]> it = stream.iterator(); boolean hasNext = true; while (hasNext) { - try { consumerTimeout = false; - if (it.hasNext()) { + // only poll the next message if we are allowed to run and are not suspending + if (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) { mm = it.next(); Exchange exchange = endpoint.createKafkaExchange(mm); try { @@ -155,7 +153,7 @@ public class KafkaConsumer extends DefaultConsumer { hasNext = false; } } catch (ConsumerTimeoutException e) { - LOG.debug(e.getMessage(), e); + LOG.debug("Consumer timeout occurred due " + e.getMessage(), e); consumerTimeout = true; } @@ -166,14 +164,9 @@ public class KafkaConsumer extends DefaultConsumer { if (!consumerTimeout) { processed = 0; } - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - break; - } catch (BrokenBarrierException e) { - LOG.error(e.getMessage(), e); + } catch (Exception e) { + getExceptionHandler().handleException("Error waiting for batch to complete", e); break; - } catch (TimeoutException e) { - LOG.error(e.getMessage(), e); } } } @@ -203,12 +196,15 @@ public class KafkaConsumer extends DefaultConsumer { } public void run() { - for (MessageAndMetadata<byte[], byte[]> mm : stream) { + ConsumerIterator<byte[], byte[]> it = stream.iterator(); + // only poll the next message if we are allowed to run and are not suspending + while (isRunAllowed() && it.hasNext()) { + MessageAndMetadata<byte[], byte[]> mm = it.next(); Exchange exchange = endpoint.createKafkaExchange(mm); try { processor.process(exchange); } catch (Exception e) { - LOG.error(e.getMessage(), e); + getExceptionHandler().handleException("Error during processing", exchange, e); } } }