Updated Branches:
  refs/heads/camel-2.11.x 5438802c8 -> 2806e816d
  refs/heads/master 4e83d8b37 -> 87b4e7024


CAMEL-6390: Suspended seda consumers should break out if no tasks on queue. And 
also break out if forced to shutdown.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87b4e702
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87b4e702
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87b4e702

Branch: refs/heads/master
Commit: 87b4e7024cebeaba878a5c5d2a72b9575adfe017
Parents: 4e83d8b
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu May 30 11:11:33 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu May 30 11:11:33 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/seda/SedaConsumer.java  |   28 +++++++++++----
 1 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/87b4e702/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index ac26109..2c8f476 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -58,6 +58,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
     private final AtomicInteger taskCount = new AtomicInteger();
     private volatile CountDownLatch latch;
     private volatile boolean shutdownPending;
+    private volatile boolean forceShutdown;
     private SedaEndpoint endpoint;
     private AsyncProcessor processor;
     private ExecutorService executor;
@@ -109,6 +110,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
     public void prepareShutdown(boolean forced) {
         // signal we want to shutdown
         shutdownPending = true;
+        forceShutdown = forced;
 
         if (latch != null) {
             LOG.debug("Preparing to shutdown, waiting for {} consumer threads 
to complete.", latch.getCount());
@@ -124,6 +126,11 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
 
     @Override
     public boolean isRunAllowed() {
+        // if we force shutdown then do not allow running anymore
+        if (forceShutdown) {
+            return false;
+        }
+
         if (isSuspending() || isSuspended()) {
             // allow to run even if we are suspended as we want to
             // keep the thread task running
@@ -160,14 +167,20 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
 
             // do not poll if we are suspended
             if (isSuspending() || isSuspended()) {
-                LOG.trace("Consumer is suspended so skip polling");
-                try {
-                    // sleep at most 1 sec
-                    Thread.sleep(Math.min(pollTimeout, 1000));
-                } catch (InterruptedException e) {
-                    LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
+                if (shutdownPending && queue.isEmpty()) {
+                    LOG.trace("Consumer is suspended and shutdown is pending, 
so this consumer thread is breaking out because the task queue is empty.");
+                    // we want to shutdown so break out if there queue is empty
+                    break;
+                } else {
+                    LOG.trace("Consumer is suspended so skip polling");
+                    try {
+                        // sleep at most 1 sec
+                        Thread.sleep(Math.min(pollTimeout, 1000));
+                    } catch (InterruptedException e) {
+                        LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
+                    }
+                    continue;
                 }
-                continue;
             }
 
             Exchange exchange = null;
@@ -284,6 +297,7 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
     protected void doStart() throws Exception {
         latch = new CountDownLatch(endpoint.getConcurrentConsumers());
         shutdownPending = false;
+        forceShutdown = false;
 
         setupTasks();
         endpoint.onStarted(this);

Reply via email to