Repository: camel Updated Branches: refs/heads/master d198385bb -> 3b5806bdf
CAMEL-11400: RoutePolicySupport - Should have separated suspend/resume vs start/stop consumer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3b5806bd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3b5806bd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3b5806bd Branch: refs/heads/master Commit: 3b5806bdf90312b6374c8ffac4a65822b59cb255 Parents: d198385 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jun 13 16:55:49 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jun 13 17:36:53 2017 +0200 ---------------------------------------------------------------------- .../impl/ThrottlingExceptionRoutePolicy.java | 6 +- .../impl/ThrottlingInflightRoutePolicy.java | 4 +- .../camel/support/RoutePolicySupport.java | 84 +++++++++++++++++--- .../management/ManagedSuspendedServiceTest.java | 2 +- .../ThrottlingInflightRoutePolicyTest.java | 2 +- .../quartz/ScheduledRoutePolicy.java | 6 +- .../quartz2/ScheduledRoutePolicy.java | 6 +- 7 files changed, 88 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java index aac5ee2..92aba77 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java @@ -201,7 +201,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement protected void openCircuit(Route route) { try { lock.lock(); - stopConsumer(route.getConsumer()); + suspendOrStopConsumer(route.getConsumer()); state.set(STATE_OPEN); openedAt = System.currentTimeMillis(); halfOpenTimer = new Timer(); @@ -217,7 +217,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement protected void halfOpenCircuit(Route route) { try { lock.lock(); - startConsumer(route.getConsumer()); + resumeOrStartConsumer(route.getConsumer()); state.set(STATE_HALF_OPEN); logState(); } catch (Exception e) { @@ -230,7 +230,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement protected void closeCircuit(Route route) { try { lock.lock(); - startConsumer(route.getConsumer()); + resumeOrStartConsumer(route.getConsumer()); failures.set(0); lastFailure = 0; openedAt = 0; http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java index a0a1914..8db542a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java @@ -238,14 +238,14 @@ public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements } private void startConsumer(int size, Consumer consumer) throws Exception { - boolean started = super.startConsumer(consumer); + boolean started = resumeOrStartConsumer(consumer); if (started) { getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer); } } private void stopConsumer(int size, Consumer consumer) throws Exception { - boolean stopped = super.stopConsumer(consumer); + boolean stopped = suspendOrStopConsumer(consumer); if (stopped) { getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer); } http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java index 42e9edf..fb3580e 100644 --- a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Route; +import org.apache.camel.Suspendable; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.util.ServiceHelper; @@ -75,20 +76,85 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route // noop } + /** + * Starts the consumer. + * + * @return the returned value is always <tt>true</tt> and should not be used. + * @see #resumeOrStartConsumer(Consumer) + */ public boolean startConsumer(Consumer consumer) throws Exception { - boolean resumed = ServiceHelper.resumeService(consumer); - if (resumed) { - log.debug("Resuming consumer {}", consumer); - } - return resumed; + // TODO: change to void in Camel 3.0 + ServiceHelper.startService(consumer); + log.debug("Started consumer {}", consumer); + return true; } + /** + * Stops the consumer. + * + * @return the returned value is always <tt>true</tt> and should not be used. + * @see #suspendOrStopConsumer(Consumer) + */ public boolean stopConsumer(Consumer consumer) throws Exception { - boolean suspended = ServiceHelper.suspendService(consumer); - if (suspended) { - log.debug("Suspended consumer {}", consumer); + // TODO: change to void in Camel 3.0 + // stop and shutdown + ServiceHelper.stopAndShutdownServices(consumer); + log.debug("Stopped consumer {}", consumer); + return true; + } + + /** + * Suspends or stops the consumer. + * + * If the consumer is {@link org.apache.camel.Suspendable} then the consumer is suspended, + * otherwise the consumer is stopped. + * + * @see #stopConsumer(Consumer) + * @return <tt>true</tt> if the consumer was suspended or stopped, <tt>false</tt> if the consumer was already suspend or stopped + */ + public boolean suspendOrStopConsumer(Consumer consumer) throws Exception { + if (consumer instanceof Suspendable) { + boolean suspended = ServiceHelper.suspendService(consumer); + if (suspended) { + log.debug("Suspended consumer {}", consumer); + } else { + log.trace("Consumer already suspended {}", consumer); + } + return suspended; + } + if (!ServiceHelper.isStopped(consumer)) { + ServiceHelper.stopService(consumer); + log.debug("Stopped consumer {}", consumer); + return true; + } + return false; + } + + /** + * Resumes or starts the consumer. + * + * If the consumer is {@link org.apache.camel.Suspendable} then the consumer is resumed, + * otherwise the consumer is started. + * + * @see #startConsumer(Consumer) + * @return <tt>true</tt> if the consumer was resumed or started, <tt>false</tt> if the consumer was already resumed or started + */ + public boolean resumeOrStartConsumer(Consumer consumer) throws Exception { + if (consumer instanceof Suspendable) { + boolean resumed = ServiceHelper.resumeService(consumer); + if (resumed) { + log.debug("Resumed consumer {}", consumer); + } else { + log.trace("Consumer already resumed {}", consumer); + } + return resumed; + } + if (!ServiceHelper.isStarted(consumer)) { + ServiceHelper.startService(consumer); + log.debug("Started consumer {}", consumer); + return true; } - return suspended; + return false; } public void startRoute(Route route) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java index ab4b197..974dc9f 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java @@ -119,7 +119,7 @@ public class ManagedSuspendedServiceTest extends ManagementTestSupport { // only stop it at first run if (counter++ == 0) { try { - super.stopConsumer(route.getConsumer()); + super.suspendOrStopConsumer(route.getConsumer()); } catch (Exception e) { handleException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java index 8ef976d..66729cd 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java @@ -35,7 +35,6 @@ public class ThrottlingInflightRoutePolicyTest extends ContextTestSupport { for (int i = 0; i < size; i++) { template.sendBody(url, "Message " + i); - Thread.sleep(3); } assertMockEndpointsSatisfied(); @@ -51,6 +50,7 @@ public class ThrottlingInflightRoutePolicyTest extends ContextTestSupport { from(url) .routePolicy(policy) + .delay(3) .to("log:foo?groupSize=10").to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java b/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java index 64aeaf2..7acb796 100644 --- a/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java +++ b/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java @@ -52,7 +52,7 @@ public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements startRoute(route); // here we just check the states of the Consumer } else if (ServiceHelper.isSuspended(route.getConsumer())) { - startConsumer(route.getConsumer()); + resumeOrStartConsumer(route.getConsumer()); } } else if (action == Action.STOP) { if ((routeStatus == ServiceStatus.Started) || (routeStatus == ServiceStatus.Suspended)) { @@ -62,14 +62,14 @@ public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements } } else if (action == Action.SUSPEND) { if (routeStatus == ServiceStatus.Started) { - stopConsumer(route.getConsumer()); + suspendOrStopConsumer(route.getConsumer()); } else { LOG.warn("Route is not in a started state and cannot be suspended. The current route state is {}", routeStatus); } } else if (action == Action.RESUME) { if (routeStatus == ServiceStatus.Started) { if (ServiceHelper.isSuspended(route.getConsumer())) { - startConsumer(route.getConsumer()); + resumeOrStartConsumer(route.getConsumer()); } else { LOG.warn("The Consumer {} is not suspended and cannot be resumed.", route.getConsumer()); } http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java b/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java index 3f6639c..cd7050b 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java @@ -66,7 +66,7 @@ public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements startRoute(route); // here we just check the states of the Consumer } else if (ServiceHelper.isSuspended(route.getConsumer())) { - startConsumer(route.getConsumer()); + resumeOrStartConsumer(route.getConsumer()); } } else if (action == Action.STOP) { if ((routeStatus == ServiceStatus.Started) || (routeStatus == ServiceStatus.Suspended)) { @@ -76,14 +76,14 @@ public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements } } else if (action == Action.SUSPEND) { if (routeStatus == ServiceStatus.Started) { - stopConsumer(route.getConsumer()); + suspendOrStopConsumer(route.getConsumer()); } else { LOG.warn("Route is not in a started state and cannot be suspended. The current route state is {}", routeStatus); } } else if (action == Action.RESUME) { if (routeStatus == ServiceStatus.Started) { if (ServiceHelper.isSuspended(route.getConsumer())) { - startConsumer(route.getConsumer()); + resumeOrStartConsumer(route.getConsumer()); } else { LOG.warn("The Consumer {} is not suspended and cannot be resumed.", route.getConsumer()); }