Updated Branches: refs/heads/camel-2.11.x 5438802c8 -> 2806e816d refs/heads/master 4e83d8b37 -> 87b4e7024
CAMEL-6390: Suspended seda consumers should break out if no tasks on queue. And also break out if forced to shutdown. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87b4e702 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87b4e702 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87b4e702 Branch: refs/heads/master Commit: 87b4e7024cebeaba878a5c5d2a72b9575adfe017 Parents: 4e83d8b Author: Claus Ibsen <davscl...@apache.org> Authored: Thu May 30 11:11:33 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 30 11:11:33 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/seda/SedaConsumer.java | 28 +++++++++++---- 1 files changed, 21 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/87b4e702/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index ac26109..2c8f476 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -58,6 +58,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, private final AtomicInteger taskCount = new AtomicInteger(); private volatile CountDownLatch latch; private volatile boolean shutdownPending; + private volatile boolean forceShutdown; private SedaEndpoint endpoint; private AsyncProcessor processor; private ExecutorService executor; @@ -109,6 +110,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, public void prepareShutdown(boolean forced) { // signal we want to shutdown shutdownPending = true; + forceShutdown = forced; if (latch != null) { LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount()); @@ -124,6 +126,11 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, @Override public boolean isRunAllowed() { + // if we force shutdown then do not allow running anymore + if (forceShutdown) { + return false; + } + if (isSuspending() || isSuspended()) { // allow to run even if we are suspended as we want to // keep the thread task running @@ -160,14 +167,20 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, // do not poll if we are suspended if (isSuspending() || isSuspended()) { - LOG.trace("Consumer is suspended so skip polling"); - try { - // sleep at most 1 sec - Thread.sleep(Math.min(pollTimeout, 1000)); - } catch (InterruptedException e) { - LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + if (shutdownPending && queue.isEmpty()) { + LOG.trace("Consumer is suspended and shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); + // we want to shutdown so break out if there queue is empty + break; + } else { + LOG.trace("Consumer is suspended so skip polling"); + try { + // sleep at most 1 sec + Thread.sleep(Math.min(pollTimeout, 1000)); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + } + continue; } - continue; } Exchange exchange = null; @@ -284,6 +297,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, protected void doStart() throws Exception { latch = new CountDownLatch(endpoint.getConcurrentConsumers()); shutdownPending = false; + forceShutdown = false; setupTasks(); endpoint.onStarted(this);