Author: davsclaus
Date: Wed Nov 24 19:36:19 2010
New Revision: 1038780

URL: http://svn.apache.org/viewvc?rev=1038780&view=rev
Log:
CAMEL-3348: Fixed issue stopping seda consumer ensuring all exchanges on 
qudeues is processed during graceful shutdown.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
    
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
    
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
 Wed Nov 24 19:36:19 2010
@@ -76,4 +76,8 @@ public class DirectConsumer extends Defa
         // of inflight messages. 
         return 0;
     }
+
+    public void prepareShutdown() {
+        // noop
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 Wed Nov 24 19:36:19 2010
@@ -180,6 +180,10 @@ public abstract class GenericFileConsume
         }
     }
 
+    public void prepareShutdown() {
+        // noop
+    }
+
     public boolean isBatchAllowed() {
         // stop if we are not running
         boolean answer = isRunAllowed();

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 Wed Nov 24 19:36:19 2010
@@ -17,9 +17,9 @@
 package org.apache.camel.component.seda;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -46,11 +46,8 @@ import org.apache.commons.logging.LogFac
 public class SedaConsumer extends ServiceSupport implements Consumer, 
Runnable, ShutdownAware {
     private static final transient Log LOG = 
LogFactory.getLog(SedaConsumer.class);
 
-    // use a task counter to help ensure we can graceful shutdown the seda 
consumers without
-    // causing any exchanges to be lost due a tiny loophole between the 
exchange is polled
-    // and when its registered as in flight exchange
-    private final AtomicInteger tasks = new AtomicInteger();
-    private volatile boolean pendingStop;
+    private CountDownLatch latch;
+    private volatile boolean shutdownPending;
     private SedaEndpoint endpoint;
     private AsyncProcessor processor;
     private ExecutorService executor;
@@ -93,41 +90,29 @@ public class SedaConsumer extends Servic
 
     public int getPendingExchangesSize() {
         // number of pending messages on the queue
-        int answer = endpoint.getQueue().size();
-        if (answer == 0) {
-            // signal we want to stop
-            pendingStop = true;
-
-            // if there are no pending exchanges we at first must ensure that
-            // all tasks has been completed and the thread is stopped, to avoid
-            // any condition which otherwise would cause an exchange to be lost
-
-            // we think there are 0 pending exchanges but we are only 100% sure
-            // when all the running tasks has been shutdown, so they do not
-            // somehow have polled an Exchange which we otherwise may loose
-            // due the Exchange takes a little while before its enlisted in the
-            // in flight registry (to let Camel know there is an Exchange in 
progress)
-            answer = tasks.get();
+        return endpoint.getQueue().size();
+    }
+
+    public void prepareShutdown() {
+        // signal we want to shutdown
+        shutdownPending = true;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Preparing to shutdown, waiting for " + latch.getCount() 
+ " consumer threads to complete.");
         }
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Pending exchanges " + answer);
+        // wait for all threads to end
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            // ignore
         }
-        return answer;
     }
 
     public void run() {
-        tasks.incrementAndGet();
-
         BlockingQueue<Exchange> queue = endpoint.getQueue();
-        while (queue != null && isRunAllowed()) {
-
-            // we are done if there are no pending exchanges and we want to 
stop
-            if (pendingStop && endpoint.getQueue().size() == 0) {
-                // no more pending exchanges and we want to stop so break out
-                break;
-            }
-
+        // loop while we are allowed, or if we are stopping loop until the 
queue is empty
+        while (queue != null && (isRunAllowed())) {
             Exchange exchange = null;
             try {
                 exchange = queue.poll(1000, TimeUnit.MILLISECONDS);
@@ -142,6 +127,12 @@ public class SedaConsumer extends Servic
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, e);
                     }
+                } else if (shutdownPending && queue.isEmpty()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Shutdown is pending, so this consumer 
thread is breaking out because the task queue is empty.");
+                    }
+                    // we want to shutdown so break out if there queue is empty
+                    break;
                 }
             } catch (InterruptedException e) {
                 if (LOG.isDebugEnabled()) {
@@ -157,10 +148,9 @@ public class SedaConsumer extends Servic
             }
         }
 
-        tasks.decrementAndGet();
-
+        latch.countDown();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Ending this polling consumer thread, there are still " 
+ tasks.get() + " threads left.");
+            LOG.debug("Ending this polling consumer thread, there are still " 
+ latch.getCount() + " consumer threads left.");
         }
     }
 
@@ -205,9 +195,8 @@ public class SedaConsumer extends Servic
     }
 
     protected void doStart() throws Exception {
-        // reset state
-        pendingStop = false;
-        tasks.set(0);
+        latch = new CountDownLatch(endpoint.getConcurrentConsumers());
+        shutdownPending = false;
 
         int poolSize = endpoint.getConcurrentConsumers();
         executor = endpoint.getCamelContext().getExecutorServiceStrategy()

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
 Wed Nov 24 19:36:19 2010
@@ -97,6 +97,9 @@ public class SedaProducer extends Collec
                 }
             });
 
+            if (log.isTraceEnabled()) {
+                log.trace("Adding Exchange to queue: " + copy);
+            }
             queue.add(copy);
 
             if (timeout > 0) {
@@ -128,6 +131,9 @@ public class SedaProducer extends Collec
             }
         } else {
             // no wait, eg its a InOnly then just add to queue and return
+            if (log.isTraceEnabled()) {
+                log.trace("Adding Exchange to queue: " + copy);
+            }
             queue.add(copy);
         }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 Wed Nov 24 19:36:19 2010
@@ -432,13 +432,26 @@ public class DefaultShutdownStrategy ext
                 }
             }
 
+            // prepare for shutdown
+            for (ShutdownDeferredConsumer deferred : deferredConsumers) {
+                Consumer consumer = deferred.getConsumer();
+                if (consumer instanceof ShutdownAware) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Route: " + deferred.getRoute().getId() + " 
preparing to shutdown.");
+                    }
+                    ((ShutdownAware) consumer).prepareShutdown();
+                    LOG.info("Route: " + deferred.getRoute().getId() + " 
preparing to shutdown complete.");
+                }
+            }
+
             // now all messages has been completed then stop the deferred 
consumers
             for (ShutdownDeferredConsumer deferred : deferredConsumers) {
+                Consumer consumer = deferred.getConsumer();
                 if (suspendOnly) {
-                    suspendNow(deferred.getConsumer());
+                    suspendNow(consumer);
                     LOG.info("Route: " + deferred.getRoute().getId() + " 
suspend complete.");
                 } else {
-                    shutdownNow(deferred.getConsumer());
+                    shutdownNow(consumer);
                     LOG.info("Route: " + deferred.getRoute().getId() + " 
shutdown complete.");
                 }
             }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java 
Wed Nov 24 19:36:19 2010
@@ -52,4 +52,11 @@ public interface ShutdownAware {
      * @return number of pending exchanges
      */
     int getPendingExchangesSize();
+
+    /**
+     * Prepares the consumer for shutdown.
+     * <p/>
+     * For example by graceful stopping any threads or the likes.
+     */
+    void prepareShutdown();
 }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java?rev=1038780&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
 Wed Nov 24 19:36:19 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileSedaShutdownCompleteAllTasksTest extends ContextTestSupport {
+
+    public void testShutdownCompleteAllTasks() throws Exception {
+        deleteDirectory("target/seda");
+
+        final String url = "file:target/seda";
+        template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt");
+        template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt");
+        template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt");
+        template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt");
+        
+        // give it 20 seconds to shutdown
+        context.getShutdownStrategy().setTimeout(20);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(url).routeId("route1")
+                    // let it complete all tasks during shutdown
+                    .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
+                    .to("log:delay")
+                    .delay(1000).to("seda:foo");
+
+                from("seda:foo").routeId("route2")
+                    .to("log:bar")
+                    .to("mock:bar");
+            }
+        });
+        context.start();
+
+        MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMinimumMessageCount(1);
+
+        assertMockEndpointsSatisfied();
+
+        // shutdown during processing
+        context.stop();
+
+        // should route all 5
+        assertEquals("Should complete all messages", 5, 
bar.getReceivedCounter());
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}

Modified: 
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
 (original)
+++ 
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
 Wed Nov 24 19:36:19 2010
@@ -240,6 +240,10 @@ public class IBatisPollingConsumer exten
         }
     }
 
+    public void prepareShutdown() {
+        // noop
+    }
+
     public boolean isBatchAllowed() {
         // stop if we are not running
         boolean answer = isRunAllowed();

Modified: 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 Wed Nov 24 19:36:19 2010
@@ -166,6 +166,10 @@ public class JpaConsumer extends Schedul
         }
     }
 
+    public void prepareShutdown() {
+        // noop
+    }
+
     public boolean isBatchAllowed() {
         // stop if we are not running
         boolean answer = isRunAllowed();

Modified: 
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff
==============================================================================
--- 
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 (original)
+++ 
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 Wed Nov 24 19:36:19 2010
@@ -203,6 +203,10 @@ public class MailConsumer extends Schedu
         }
     }
 
+    public void prepareShutdown() {
+        // noop
+    }
+
     public boolean isBatchAllowed() {
         // stop if we are not running
         boolean answer = isRunAllowed();


Reply via email to