CAMEL-6390: Suspended seda consumers should break out if no tasks on queue. And also break out if forced to shutdown.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2806e816 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2806e816 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2806e816 Branch: refs/heads/camel-2.11.x Commit: 2806e816d05c994f6f5c9d18124d920cf3694bcf Parents: 5438802 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu May 30 11:11:33 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 30 11:11:55 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/seda/SedaConsumer.java | 28 +++++++++++---- 1 files changed, 21 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2806e816/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 1b7f42b..bd5b2c3 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -59,6 +59,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, private final AtomicInteger taskCount = new AtomicInteger(); private volatile CountDownLatch latch; private volatile boolean shutdownPending; + private volatile boolean forceShutdown; private SedaEndpoint endpoint; private AsyncProcessor processor; private ExecutorService executor; @@ -110,6 +111,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, public void prepareShutdown(boolean forced) { // signal we want to shutdown shutdownPending = true; + forceShutdown = forced; if (latch != null) { LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount()); @@ -125,6 +127,11 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, @Override public boolean isRunAllowed() { + // if we force shutdown then do not allow running anymore + if (forceShutdown) { + return false; + } + if (isSuspending() || isSuspended()) { // allow to run even if we are suspended as we want to // keep the thread task running @@ -161,14 +168,20 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, // do not poll if we are suspended if (isSuspending() || isSuspended()) { - LOG.trace("Consumer is suspended so skip polling"); - try { - // sleep at most 1 sec - Thread.sleep(Math.min(pollTimeout, 1000)); - } catch (InterruptedException e) { - LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + if (shutdownPending && queue.isEmpty()) { + LOG.trace("Consumer is suspended and shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); + // we want to shutdown so break out if there queue is empty + break; + } else { + LOG.trace("Consumer is suspended so skip polling"); + try { + // sleep at most 1 sec + Thread.sleep(Math.min(pollTimeout, 1000)); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + } + continue; } - continue; } Exchange exchange = null; @@ -285,6 +298,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, protected void doStart() throws Exception { latch = new CountDownLatch(endpoint.getConcurrentConsumers()); shutdownPending = false; + forceShutdown = false; setupTasks(); endpoint.onStarted(this);