Author: davsclaus Date: Wed Nov 24 19:36:19 2010 New Revision: 1038780 URL: http://svn.apache.org/viewvc?rev=1038780&view=rev Log: CAMEL-3348: Fixed issue stopping seda consumer ensuring all exchanges on qudeues is processed during graceful shutdown.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java Wed Nov 24 19:36:19 2010 @@ -76,4 +76,8 @@ public class DirectConsumer extends Defa // of inflight messages. return 0; } + + public void prepareShutdown() { + // noop + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Wed Nov 24 19:36:19 2010 @@ -180,6 +180,10 @@ public abstract class GenericFileConsume } } + public void prepareShutdown() { + // noop + } + public boolean isBatchAllowed() { // stop if we are not running boolean answer = isRunAllowed(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed Nov 24 19:36:19 2010 @@ -17,9 +17,9 @@ package org.apache.camel.component.seda; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -46,11 +46,8 @@ import org.apache.commons.logging.LogFac public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); - // use a task counter to help ensure we can graceful shutdown the seda consumers without - // causing any exchanges to be lost due a tiny loophole between the exchange is polled - // and when its registered as in flight exchange - private final AtomicInteger tasks = new AtomicInteger(); - private volatile boolean pendingStop; + private CountDownLatch latch; + private volatile boolean shutdownPending; private SedaEndpoint endpoint; private AsyncProcessor processor; private ExecutorService executor; @@ -93,41 +90,29 @@ public class SedaConsumer extends Servic public int getPendingExchangesSize() { // number of pending messages on the queue - int answer = endpoint.getQueue().size(); - if (answer == 0) { - // signal we want to stop - pendingStop = true; - - // if there are no pending exchanges we at first must ensure that - // all tasks has been completed and the thread is stopped, to avoid - // any condition which otherwise would cause an exchange to be lost - - // we think there are 0 pending exchanges but we are only 100% sure - // when all the running tasks has been shutdown, so they do not - // somehow have polled an Exchange which we otherwise may loose - // due the Exchange takes a little while before its enlisted in the - // in flight registry (to let Camel know there is an Exchange in progress) - answer = tasks.get(); + return endpoint.getQueue().size(); + } + + public void prepareShutdown() { + // signal we want to shutdown + shutdownPending = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing to shutdown, waiting for " + latch.getCount() + " consumer threads to complete."); } - if (LOG.isTraceEnabled()) { - LOG.trace("Pending exchanges " + answer); + // wait for all threads to end + try { + latch.await(); + } catch (InterruptedException e) { + // ignore } - return answer; } public void run() { - tasks.incrementAndGet(); - BlockingQueue<Exchange> queue = endpoint.getQueue(); - while (queue != null && isRunAllowed()) { - - // we are done if there are no pending exchanges and we want to stop - if (pendingStop && endpoint.getQueue().size() == 0) { - // no more pending exchanges and we want to stop so break out - break; - } - + // loop while we are allowed, or if we are stopping loop until the queue is empty + while (queue != null && (isRunAllowed())) { Exchange exchange = null; try { exchange = queue.poll(1000, TimeUnit.MILLISECONDS); @@ -142,6 +127,12 @@ public class SedaConsumer extends Servic } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } + } else if (shutdownPending && queue.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); + } + // we want to shutdown so break out if there queue is empty + break; } } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { @@ -157,10 +148,9 @@ public class SedaConsumer extends Servic } } - tasks.decrementAndGet(); - + latch.countDown(); if (LOG.isDebugEnabled()) { - LOG.debug("Ending this polling consumer thread, there are still " + tasks.get() + " threads left."); + LOG.debug("Ending this polling consumer thread, there are still " + latch.getCount() + " consumer threads left."); } } @@ -205,9 +195,8 @@ public class SedaConsumer extends Servic } protected void doStart() throws Exception { - // reset state - pendingStop = false; - tasks.set(0); + latch = new CountDownLatch(endpoint.getConcurrentConsumers()); + shutdownPending = false; int poolSize = endpoint.getConcurrentConsumers(); executor = endpoint.getCamelContext().getExecutorServiceStrategy() Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Wed Nov 24 19:36:19 2010 @@ -97,6 +97,9 @@ public class SedaProducer extends Collec } }); + if (log.isTraceEnabled()) { + log.trace("Adding Exchange to queue: " + copy); + } queue.add(copy); if (timeout > 0) { @@ -128,6 +131,9 @@ public class SedaProducer extends Collec } } else { // no wait, eg its a InOnly then just add to queue and return + if (log.isTraceEnabled()) { + log.trace("Adding Exchange to queue: " + copy); + } queue.add(copy); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Wed Nov 24 19:36:19 2010 @@ -432,13 +432,26 @@ public class DefaultShutdownStrategy ext } } + // prepare for shutdown + for (ShutdownDeferredConsumer deferred : deferredConsumers) { + Consumer consumer = deferred.getConsumer(); + if (consumer instanceof ShutdownAware) { + if (LOG.isDebugEnabled()) { + LOG.debug("Route: " + deferred.getRoute().getId() + " preparing to shutdown."); + } + ((ShutdownAware) consumer).prepareShutdown(); + LOG.info("Route: " + deferred.getRoute().getId() + " preparing to shutdown complete."); + } + } + // now all messages has been completed then stop the deferred consumers for (ShutdownDeferredConsumer deferred : deferredConsumers) { + Consumer consumer = deferred.getConsumer(); if (suspendOnly) { - suspendNow(deferred.getConsumer()); + suspendNow(consumer); LOG.info("Route: " + deferred.getRoute().getId() + " suspend complete."); } else { - shutdownNow(deferred.getConsumer()); + shutdownNow(consumer); LOG.info("Route: " + deferred.getRoute().getId() + " shutdown complete."); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java Wed Nov 24 19:36:19 2010 @@ -52,4 +52,11 @@ public interface ShutdownAware { * @return number of pending exchanges */ int getPendingExchangesSize(); + + /** + * Prepares the consumer for shutdown. + * <p/> + * For example by graceful stopping any threads or the likes. + */ + void prepareShutdown(); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java?rev=1038780&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java Wed Nov 24 19:36:19 2010 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class FileSedaShutdownCompleteAllTasksTest extends ContextTestSupport { + + public void testShutdownCompleteAllTasks() throws Exception { + deleteDirectory("target/seda"); + + final String url = "file:target/seda"; + template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt"); + template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt"); + template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt"); + template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt"); + + // give it 20 seconds to shutdown + context.getShutdownStrategy().setTimeout(20); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(url).routeId("route1") + // let it complete all tasks during shutdown + .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) + .to("log:delay") + .delay(1000).to("seda:foo"); + + from("seda:foo").routeId("route2") + .to("log:bar") + .to("mock:bar"); + } + }); + context.start(); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMinimumMessageCount(1); + + assertMockEndpointsSatisfied(); + + // shutdown during processing + context.stop(); + + // should route all 5 + assertEquals("Should complete all messages", 5, bar.getReceivedCounter()); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } +} Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java (original) +++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java Wed Nov 24 19:36:19 2010 @@ -240,6 +240,10 @@ public class IBatisPollingConsumer exten } } + public void prepareShutdown() { + // noop + } + public boolean isBatchAllowed() { // stop if we are not running boolean answer = isRunAllowed(); Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original) +++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Wed Nov 24 19:36:19 2010 @@ -166,6 +166,10 @@ public class JpaConsumer extends Schedul } } + public void prepareShutdown() { + // noop + } + public boolean isBatchAllowed() { // stop if we are not running boolean answer = isRunAllowed(); Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1038780&r1=1038779&r2=1038780&view=diff ============================================================================== --- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original) +++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Wed Nov 24 19:36:19 2010 @@ -203,6 +203,10 @@ public class MailConsumer extends Schedu } } + public void prepareShutdown() { + // noop + } + public boolean isBatchAllowed() { // stop if we are not running boolean answer = isRunAllowed();