Repository: camel
Updated Branches:
  refs/heads/master d198385bb -> 3b5806bdf


CAMEL-11400: RoutePolicySupport - Should have separated suspend/resume vs 
start/stop consumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3b5806bd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3b5806bd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3b5806bd

Branch: refs/heads/master
Commit: 3b5806bdf90312b6374c8ffac4a65822b59cb255
Parents: d198385
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Jun 13 16:55:49 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Jun 13 17:36:53 2017 +0200

----------------------------------------------------------------------
 .../impl/ThrottlingExceptionRoutePolicy.java    |  6 +-
 .../impl/ThrottlingInflightRoutePolicy.java     |  4 +-
 .../camel/support/RoutePolicySupport.java       | 84 +++++++++++++++++---
 .../management/ManagedSuspendedServiceTest.java |  2 +-
 .../ThrottlingInflightRoutePolicyTest.java      |  2 +-
 .../quartz/ScheduledRoutePolicy.java            |  6 +-
 .../quartz2/ScheduledRoutePolicy.java           |  6 +-
 7 files changed, 88 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
 
b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index aac5ee2..92aba77 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -201,7 +201,7 @@ public class ThrottlingExceptionRoutePolicy extends 
RoutePolicySupport implement
     protected void openCircuit(Route route) {
         try {
             lock.lock();
-            stopConsumer(route.getConsumer());
+            suspendOrStopConsumer(route.getConsumer());
             state.set(STATE_OPEN);
             openedAt = System.currentTimeMillis();
             halfOpenTimer = new Timer();
@@ -217,7 +217,7 @@ public class ThrottlingExceptionRoutePolicy extends 
RoutePolicySupport implement
     protected void halfOpenCircuit(Route route) {
         try {
             lock.lock();
-            startConsumer(route.getConsumer());
+            resumeOrStartConsumer(route.getConsumer());
             state.set(STATE_HALF_OPEN);
             logState();
         } catch (Exception e) {
@@ -230,7 +230,7 @@ public class ThrottlingExceptionRoutePolicy extends 
RoutePolicySupport implement
     protected void closeCircuit(Route route) {
         try {
             lock.lock();
-            startConsumer(route.getConsumer());
+            resumeOrStartConsumer(route.getConsumer());
             failures.set(0);
             lastFailure = 0;
             openedAt = 0;

http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 
b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
index a0a1914..8db542a 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
@@ -238,14 +238,14 @@ public class ThrottlingInflightRoutePolicy extends 
RoutePolicySupport implements
     }
 
     private void startConsumer(int size, Consumer consumer) throws Exception {
-        boolean started = super.startConsumer(consumer);
+        boolean started = resumeOrStartConsumer(consumer);
         if (started) {
             getLogger().log("Throttling consumer: " + size + " <= " + 
resumeInflightExchanges + " inflight exchange by resuming consumer: " + 
consumer);
         }
     }
 
     private void stopConsumer(int size, Consumer consumer) throws Exception {
-        boolean stopped = super.stopConsumer(consumer);
+        boolean stopped = suspendOrStopConsumer(consumer);
         if (stopped) {
             getLogger().log("Throttling consumer: " + size + " > " + 
maxInflightExchanges + " inflight exchange by suspending consumer: " + 
consumer);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java 
b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
index 42e9edf..fb3580e 100644
--- a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
+import org.apache.camel.Suspendable;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.util.ServiceHelper;
@@ -75,20 +76,85 @@ public abstract class RoutePolicySupport extends 
ServiceSupport implements Route
         // noop
     }
 
+    /**
+     * Starts the consumer.
+     *
+     * @return the returned value is always <tt>true</tt> and should not be 
used.
+     * @see #resumeOrStartConsumer(Consumer)
+     */
     public boolean startConsumer(Consumer consumer) throws Exception {
-        boolean resumed = ServiceHelper.resumeService(consumer);
-        if (resumed) {
-            log.debug("Resuming consumer {}", consumer);
-        }
-        return resumed;
+        // TODO: change to void in Camel 3.0
+        ServiceHelper.startService(consumer);
+        log.debug("Started consumer {}", consumer);
+        return true;
     }
 
+    /**
+     * Stops the consumer.
+     *
+     * @return the returned value is always <tt>true</tt> and should not be 
used.
+     * @see #suspendOrStopConsumer(Consumer)
+     */
     public boolean stopConsumer(Consumer consumer) throws Exception {
-        boolean suspended = ServiceHelper.suspendService(consumer);
-        if (suspended) {
-            log.debug("Suspended consumer {}", consumer);
+        // TODO: change to void in Camel 3.0
+        // stop and shutdown
+        ServiceHelper.stopAndShutdownServices(consumer);
+        log.debug("Stopped consumer {}", consumer);
+        return true;
+    }
+
+    /**
+     * Suspends or stops the consumer.
+     *
+     * If the consumer is {@link org.apache.camel.Suspendable} then the 
consumer is suspended,
+     * otherwise the consumer is stopped.
+     *
+     * @see #stopConsumer(Consumer)
+     * @return <tt>true</tt> if the consumer was suspended or stopped, 
<tt>false</tt> if the consumer was already suspend or stopped
+     */
+    public boolean suspendOrStopConsumer(Consumer consumer) throws Exception {
+        if (consumer instanceof Suspendable) {
+            boolean suspended = ServiceHelper.suspendService(consumer);
+            if (suspended) {
+                log.debug("Suspended consumer {}", consumer);
+            } else {
+                log.trace("Consumer already suspended {}", consumer);
+            }
+            return suspended;
+        }
+        if (!ServiceHelper.isStopped(consumer)) {
+            ServiceHelper.stopService(consumer);
+            log.debug("Stopped consumer {}", consumer);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Resumes or starts the consumer.
+     *
+     * If the consumer is {@link org.apache.camel.Suspendable} then the 
consumer is resumed,
+     * otherwise the consumer is started.
+     *
+     * @see #startConsumer(Consumer)
+     * @return <tt>true</tt> if the consumer was resumed or started, 
<tt>false</tt> if the consumer was already resumed or started
+     */
+    public boolean resumeOrStartConsumer(Consumer consumer) throws Exception {
+        if (consumer instanceof Suspendable) {
+            boolean resumed = ServiceHelper.resumeService(consumer);
+            if (resumed) {
+                log.debug("Resumed consumer {}", consumer);
+            } else {
+                log.trace("Consumer already resumed {}", consumer);
+            }
+            return resumed;
+        }
+        if (!ServiceHelper.isStarted(consumer)) {
+            ServiceHelper.startService(consumer);
+            log.debug("Started consumer {}", consumer);
+            return true;
         }
-        return suspended;
+        return false;
     }
 
     public void startRoute(Route route) throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
index ab4b197..974dc9f 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
@@ -119,7 +119,7 @@ public class ManagedSuspendedServiceTest extends 
ManagementTestSupport {
             // only stop it at first run
             if (counter++ == 0) {
                 try {
-                    super.stopConsumer(route.getConsumer());
+                    super.suspendOrStopConsumer(route.getConsumer());
                 } catch (Exception e) {
                     handleException(e);
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
index 8ef976d..66729cd 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
@@ -35,7 +35,6 @@ public class ThrottlingInflightRoutePolicyTest extends 
ContextTestSupport {
 
         for (int i = 0; i < size; i++) {
             template.sendBody(url, "Message " + i);
-            Thread.sleep(3);
         }
 
         assertMockEndpointsSatisfied();
@@ -51,6 +50,7 @@ public class ThrottlingInflightRoutePolicyTest extends 
ContextTestSupport {
 
                 from(url)
                     .routePolicy(policy)
+                    .delay(3)
                     .to("log:foo?groupSize=10").to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
 
b/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
index 64aeaf2..7acb796 100644
--- 
a/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
+++ 
b/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java
@@ -52,7 +52,7 @@ public abstract class ScheduledRoutePolicy extends 
RoutePolicySupport implements
                 startRoute(route);
                 // here we just check the states of the Consumer
             } else if (ServiceHelper.isSuspended(route.getConsumer())) {
-                startConsumer(route.getConsumer());
+                resumeOrStartConsumer(route.getConsumer());
             }
         } else if (action == Action.STOP) {
             if ((routeStatus == ServiceStatus.Started) || (routeStatus == 
ServiceStatus.Suspended)) {
@@ -62,14 +62,14 @@ public abstract class ScheduledRoutePolicy extends 
RoutePolicySupport implements
             }
         } else if (action == Action.SUSPEND) {
             if (routeStatus == ServiceStatus.Started) {
-                stopConsumer(route.getConsumer());
+                suspendOrStopConsumer(route.getConsumer());
             } else {
                 LOG.warn("Route is not in a started state and cannot be 
suspended. The current route state is {}", routeStatus);
             }
         } else if (action == Action.RESUME) {
             if (routeStatus == ServiceStatus.Started) {
                 if (ServiceHelper.isSuspended(route.getConsumer())) {
-                    startConsumer(route.getConsumer());
+                    resumeOrStartConsumer(route.getConsumer());
                 } else {
                     LOG.warn("The Consumer {} is not suspended and cannot be 
resumed.", route.getConsumer());
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3b5806bd/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java
 
b/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java
index 3f6639c..cd7050b 100644
--- 
a/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java
+++ 
b/components/camel-quartz2/src/main/java/org/apache/camel/routepolicy/quartz2/ScheduledRoutePolicy.java
@@ -66,7 +66,7 @@ public abstract class ScheduledRoutePolicy extends 
RoutePolicySupport implements
                 startRoute(route);
                 // here we just check the states of the Consumer
             } else if (ServiceHelper.isSuspended(route.getConsumer())) {
-                startConsumer(route.getConsumer());
+                resumeOrStartConsumer(route.getConsumer());
             }
         } else if (action == Action.STOP) {
             if ((routeStatus == ServiceStatus.Started) || (routeStatus == 
ServiceStatus.Suspended)) {
@@ -76,14 +76,14 @@ public abstract class ScheduledRoutePolicy extends 
RoutePolicySupport implements
             }
         } else if (action == Action.SUSPEND) {
             if (routeStatus == ServiceStatus.Started) {
-                stopConsumer(route.getConsumer());
+                suspendOrStopConsumer(route.getConsumer());
             } else {
                 LOG.warn("Route is not in a started state and cannot be 
suspended. The current route state is {}", routeStatus);
             }
         } else if (action == Action.RESUME) {
             if (routeStatus == ServiceStatus.Started) {
                 if (ServiceHelper.isSuspended(route.getConsumer())) {
-                    startConsumer(route.getConsumer());
+                    resumeOrStartConsumer(route.getConsumer());
                 } else {
                     LOG.warn("The Consumer {} is not suspended and cannot be 
resumed.", route.getConsumer());
                 }

Reply via email to