CAMEL-6390: Suspended seda consumers should break out if no tasks on queue. And 
also break out if forced to shutdown.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2806e816
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2806e816
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2806e816

Branch: refs/heads/camel-2.11.x
Commit: 2806e816d05c994f6f5c9d18124d920cf3694bcf
Parents: 5438802
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu May 30 11:11:33 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu May 30 11:11:55 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/seda/SedaConsumer.java  |   28 +++++++++++----
 1 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2806e816/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 1b7f42b..bd5b2c3 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -59,6 +59,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
     private final AtomicInteger taskCount = new AtomicInteger();
     private volatile CountDownLatch latch;
     private volatile boolean shutdownPending;
+    private volatile boolean forceShutdown;
     private SedaEndpoint endpoint;
     private AsyncProcessor processor;
     private ExecutorService executor;
@@ -110,6 +111,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
     public void prepareShutdown(boolean forced) {
         // signal we want to shutdown
         shutdownPending = true;
+        forceShutdown = forced;
 
         if (latch != null) {
             LOG.debug("Preparing to shutdown, waiting for {} consumer threads 
to complete.", latch.getCount());
@@ -125,6 +127,11 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
 
     @Override
     public boolean isRunAllowed() {
+        // if we force shutdown then do not allow running anymore
+        if (forceShutdown) {
+            return false;
+        }
+
         if (isSuspending() || isSuspended()) {
             // allow to run even if we are suspended as we want to
             // keep the thread task running
@@ -161,14 +168,20 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
 
             // do not poll if we are suspended
             if (isSuspending() || isSuspended()) {
-                LOG.trace("Consumer is suspended so skip polling");
-                try {
-                    // sleep at most 1 sec
-                    Thread.sleep(Math.min(pollTimeout, 1000));
-                } catch (InterruptedException e) {
-                    LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
+                if (shutdownPending && queue.isEmpty()) {
+                    LOG.trace("Consumer is suspended and shutdown is pending, 
so this consumer thread is breaking out because the task queue is empty.");
+                    // we want to shutdown so break out if there queue is empty
+                    break;
+                } else {
+                    LOG.trace("Consumer is suspended so skip polling");
+                    try {
+                        // sleep at most 1 sec
+                        Thread.sleep(Math.min(pollTimeout, 1000));
+                    } catch (InterruptedException e) {
+                        LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
+                    }
+                    continue;
                 }
-                continue;
             }
 
             Exchange exchange = null;
@@ -285,6 +298,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
     protected void doStart() throws Exception {
         latch = new CountDownLatch(endpoint.getConcurrentConsumers());
         shutdownPending = false;
+        forceShutdown = false;
 
         setupTasks();
         endpoint.onStarted(this);

Reply via email to