CAMEL-9150: Seda suspend/resume should not trigger start/stop logic
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e75d37b3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e75d37b3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e75d37b3 Branch: refs/heads/master Commit: e75d37b3918eb957924e5122672accc6c01c72fc Parents: 20ec49a Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Sep 22 09:52:37 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Sep 22 09:52:37 2015 +0200 ---------------------------------------------------------------------- .../camel/component/direct/DirectConsumer.java | 2 +- .../camel/component/seda/SedaConsumer.java | 9 ++++++++- .../camel/impl/DefaultShutdownStrategy.java | 20 ++++++++++++-------- .../impl/ScheduledBatchPollingConsumer.java | 2 +- .../camel/processor/RedeliveryErrorHandler.java | 2 +- .../processor/aggregate/AggregateProcessor.java | 2 +- .../org/apache/camel/spi/ShutdownPrepared.java | 5 +++-- ...ntextSuspendResumeRouteStartupOrderTest.java | 4 ++++ ...faultCamelContextSuspendResumeRouteTest.java | 6 ++++++ .../camel/impl/TwoRouteSuspendResumeTest.java | 3 +++ 10 files changed, 40 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java index 83dbbca..a5be34f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java @@ -82,7 +82,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su return 0; } - public void prepareShutdown(boolean forced) { + public void prepareShutdown(boolean suspendOnly, boolean forced) { // noop } } http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 14e4372..c0970fb 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -109,7 +109,14 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, } @Override - public void prepareShutdown(boolean forced) { + public void prepareShutdown(boolean suspendOnly, boolean forced) { + // if we are suspending then we want to keep the thread running but just not route the exchange + // this logic is only when we stop or shutdown the consumer + if (suspendOnly) { + LOG.debug("Skip preparing to shutdown as consumer is being suspended"); + return; + } + // signal we want to shutdown shutdownPending = true; forceShutdown = forced; http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java index 4d1395e..0657f15 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java @@ -184,7 +184,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS Collections.reverse(routesOrdered); } - LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); + if (suspendOnly) { + LOG.info("Starting to graceful suspend " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); + } else { + LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); + } // use another thread to perform the shutdowns so we can support timeout timeoutOccurred.set(false); @@ -230,7 +234,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS // now the route consumers has been shutdown, then prepare route services for shutdown now (forced) for (RouteStartupOrder order : routes) { for (Service service : order.getServices()) { - prepareShutdown(service, true, true, isSuppressLoggingOnTimeout()); + prepareShutdown(service, false, true, true, isSuppressLoggingOnTimeout()); } } } else { @@ -430,14 +434,14 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS } /** - * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method + * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean, boolean)} method * on the service if it implement this interface. * * @param service the service * @param forced whether to force shutdown * @param includeChildren whether to prepare the child of the service as well */ - private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) { + private static void prepareShutdown(Service service, boolean suspendOnly, boolean forced, boolean includeChildren, boolean suppressLogging) { Set<Service> list; if (includeChildren) { // include error handlers as we want to prepare them for shutdown as well @@ -451,7 +455,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS if (child instanceof ShutdownPrepared) { try { LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child); - ((ShutdownPrepared) child).prepareShutdown(forced); + ((ShutdownPrepared) child).prepareShutdown(suspendOnly, forced); } catch (Exception e) { if (suppressLogging) { LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); @@ -580,7 +584,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS if (service instanceof Consumer) { continue; } - prepareShutdown(service, false, true, false); + prepareShutdown(service, suspendOnly, false, true, false); } } @@ -643,7 +647,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId()); boolean forced = context.getShutdownStrategy().forceShutdown(consumer); boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); - prepareShutdown(consumer, forced, false, suppress); + prepareShutdown(consumer, suspendOnly, forced, false, suppress); LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId()); } } @@ -665,7 +669,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS for (Service service : order.getServices()) { boolean forced = context.getShutdownStrategy().forceShutdown(service); boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); - prepareShutdown(service, forced, true, suppress); + prepareShutdown(service, suspendOnly, forced, true, suppress); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java index b16a5c4..a39263c 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java @@ -77,7 +77,7 @@ public abstract class ScheduledBatchPollingConsumer extends ScheduledPollConsume } @Override - public void prepareShutdown(boolean forced) { + public void prepareShutdown(boolean suspendOnly, boolean forced) { // reset task as the state of the task is not to be preserved // which otherwise may cause isBatchAllowed() to return a wrong answer this.shutdownRunningTask = null; http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index d15d2a1..04784eb 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -323,7 +323,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } @Override - public void prepareShutdown(boolean forced) { + public void prepareShutdown(boolean suspendOnly, boolean forced) { // prepare for shutdown, eg do not allow redelivery if configured log.trace("Prepare shutdown on error handler {}", this); preparingShutdown = true; http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 4e3403c..5c400f6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -1381,7 +1381,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } @Override - public void prepareShutdown(boolean forced) { + public void prepareShutdown(boolean suspendOnly, boolean forced) { // we are shutting down, so force completion if this option was enabled // but only do this when forced=false, as that is when we have chance to // send out new messages to be routed by Camel. When forced=true, then http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java b/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java index 2721d7b..b6f1dd9 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java @@ -42,8 +42,9 @@ public interface ShutdownPrepared { * For forced shutdown, then the service is expected to aggressively shutdown any child services, such * as thread pools etc. This is the last chance it has to perform such duties. * - * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown. + * @param suspendOnly <tt>true</tt> if the intention is to only suspend the service, and not stop/shutdown the service. + * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown. */ - void prepareShutdown(boolean forced); + void prepareShutdown(boolean suspendOnly, boolean forced); } http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java index 9eea103..a8c4c1a 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java @@ -41,6 +41,10 @@ public class DefaultCamelContextSuspendResumeRouteStartupOrderTest extends Conte resetMocks(); mock.expectedMessageCount(0); context.suspend(); + + // need to give seda consumer thread time to idle + Thread.sleep(500); + template.sendBody("seda:foo", "B"); mock.assertIsSatisfied(1000); http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java index fd4db6d..f7b6643 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java @@ -40,8 +40,14 @@ public class DefaultCamelContextSuspendResumeRouteTest extends ContextTestSuppor // now suspend and dont expect a message to be routed resetMocks(); mock.expectedMessageCount(0); + context.suspend(); + + // need to give seda consumer thread time to idle + Thread.sleep(500); + template.sendBody("seda:foo", "B"); + mock.assertIsSatisfied(1000); assertTrue(context.isSuspended()); http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java b/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java index 1c36f48..5f9f4ea 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java @@ -44,6 +44,9 @@ public class TwoRouteSuspendResumeTest extends ContextTestSupport { context.suspendRoute("foo"); + // need to give seda consumer thread time to idle + Thread.sleep(500); + template.sendBody("seda:foo", "B"); template.sendBody("direct:bar", "C");