Author: dkulp
Date: Fri Jul  8 20:22:25 2011
New Revision: 1144486

URL: http://svn.apache.org/viewvc?rev=1144486&view=rev
Log:
Merged revisions 1139749 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1139749 | davsclaus | 2011-06-26 05:35:51 -0400 (Sun, 26 Jun 2011) | 1 line
  
  CAMEL-4149: Fixed throttling route policy in context scope may not resume 
route when hitting low watermark
........

Added:
    
camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java
      - copied unchanged from r1139749, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java
Modified:
    camel/branches/camel-2.7.x/   (props changed)
    
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul  8 20:22:25 2011
@@ -1 +1 @@
-/camel/trunk:1083696,1083723-1083724,1084150,1085277,1085543,1085549,1085905,1085909,1086165,1086231,1087005,1087276,1087612,1087620,1087856,1088583,1088916-1088917,1089275,1089348,1090166,1090204,1090564,1090960-1090969,1091082,1091518,1091771,1091799,1092034,1092068,1092577,1092667,1093978,1093980,1093999,1094123,1094147,1094156,1095405,1095469,1095471,1095475-1095476,1096346,1096736,1097761,1097909,1097912,1097978,1098032,1098628,1098630,1099228,1099417,1100975,1102162,1102177,1102181,1104076,1124497,1127744,1127988,1128315,1128970,1131411,1132961,1134252,1134260,1134404,1134501,1134626,1134681,1134714-1134911,1135223,1135364,1136065,1136290,1138285,1139163,1140096-1140102,1141783,1143925,1144248,1144324
+/camel/trunk:1083696,1083723-1083724,1084150,1085277,1085543,1085549,1085905,1085909,1086165,1086231,1087005,1087276,1087612,1087620,1087856,1088583,1088916-1088917,1089275,1089348,1090166,1090204,1090564,1090960-1090969,1091082,1091518,1091771,1091799,1092034,1092068,1092577,1092667,1093978,1093980,1093999,1094123,1094147,1094156,1095405,1095469,1095471,1095475-1095476,1096346,1096736,1097761,1097909,1097912,1097978,1098032,1098628,1098630,1099228,1099417,1100975,1102162,1102177,1102181,1104076,1124497,1127744,1127988,1128315,1128970,1131411,1132961,1134252,1134260,1134404,1134501,1134626,1134681,1134714-1134911,1135223,1135364,1136065,1136290,1138285,1139163,1139749,1140096-1140102,1141783,1143925,1144248,1144324

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1144486&r1=1144485&r2=1144486&view=diff
==============================================================================
--- 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 (original)
+++ 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 Fri Jul  8 20:22:25 2011
@@ -155,16 +155,18 @@ public class DefaultRouteContext impleme
             RoutePolicyProcessor routePolicyProcessor = null;
             List<RoutePolicy> routePolicyList = getRoutePolicyList();
             if (routePolicyList != null && !routePolicyList.isEmpty()) {
-                routePolicyProcessor = new 
RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
-
-                // add it as service if we have not already done that (eg 
possible if two routes have the same service)
-                if (!camelContext.hasService(routePolicyProcessor)) {
-                    try {
-                        camelContext.addService(routePolicyProcessor);
-                    } catch (Exception e) {
-                        throw ObjectHelper.wrapRuntimeCamelException(e);
+                for (RoutePolicy policy : routePolicyList) {
+                    // add policy as service if we have not already done that 
(eg possible if two routes have the same service)
+                    // this ensures Camel can control the lifecycle of the 
policy
+                    if (!camelContext.hasService(policy)) {
+                        try {
+                            camelContext.addService(policy);
+                        } catch (Exception e) {
+                            throw ObjectHelper.wrapRuntimeCamelException(e);
+                        }
                     }
                 }
+                routePolicyProcessor = new 
RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
                 target = routePolicyProcessor;
             } else {
                 target = unitOfWorkProcessor;

Modified: 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1144486&r1=1144485&r2=1144486&view=diff
==============================================================================
--- 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 (original)
+++ 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 Fri Jul  8 20:22:25 2011
@@ -16,29 +16,48 @@
  */
 package org.apache.camel.impl;
 
+import java.util.EventObject;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Route;
+import org.apache.camel.management.EventNotifierSupport;
+import org.apache.camel.management.event.ExchangeCompletedEvent;
 import org.apache.camel.processor.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.LoggerFactory;
 
 /**
  * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable 
of dynamic
  * throttling a route based on number of current inflight exchanges.
+ * <p/>
+ * This implementation supports two scopes {@link ThrottlingScope#Context} and 
{@link ThrottlingScope#Route} (is default).
+ * If context scope is selected then this implementation will use a {@link 
org.apache.camel.spi.EventNotifier} to listen
+ * for events when {@link Exchange}s is done, and trigger the {@link 
#throttle(org.apache.camel.Route, org.apache.camel.Exchange)}
+ * method. If the route scope is selected then <b>no</b> {@link 
org.apache.camel.spi.EventNotifier} is in use, as there is already
+ * a {@link org.apache.camel.spi.Synchronization} callback on the current 
{@link Exchange} which triggers the
+ * {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} when 
the current {@link Exchange} is done.
  *
  * @version 
  */
-public class ThrottlingInflightRoutePolicy extends RoutePolicySupport {
+public class ThrottlingInflightRoutePolicy extends RoutePolicySupport 
implements CamelContextAware {
 
     public enum ThrottlingScope {
         Context, Route
     }
 
+    private final Set<Route> routes = new LinkedHashSet<Route>();
+    private ContextScopedEventNotifier eventNotifier;
+    private CamelContext camelContext;
     private final Lock lock = new ReentrantLock();
     private ThrottlingScope scope = ThrottlingScope.Route;
     private int maxInflightExchanges = 1000;
@@ -55,12 +74,45 @@ public class ThrottlingInflightRoutePoli
         return "ThrottlingInflightRoutePolicy[" + maxInflightExchanges + " / " 
+ resumePercentOfMax + "% using scope " + scope + "]";
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        // we need to remember the routes we apply for
+        routes.add(route);
+    }
+
+    @Override
     public void onExchangeDone(Route route, Exchange exchange) {
+        // if route scoped then throttle directly
+        // as context scoped is handled using an EventNotifier instead
+        if (scope == ThrottlingScope.Route) {
+            throttle(route, exchange);
+        }
+    }
+
+    /**
+     * Throttles the route when {@link Exchange}s is done.
+     *
+     * @param route  the route
+     * @param exchange the exchange
+     */
+    protected void throttle(Route route, Exchange exchange) {
         // this works the best when this logic is executed when the exchange 
is done
         Consumer consumer = route.getConsumer();
 
         int size = getSize(consumer, exchange);
-        if (maxInflightExchanges > 0 && size > maxInflightExchanges) {
+        boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
+        if (log.isTraceEnabled()) {
+            log.trace("{} > 0 && {} > {} evaluated as {}", new 
Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
+        }
+        if (stop) {
             try {
                 lock.lock();
                 stopConsumer(size, consumer);
@@ -74,7 +126,11 @@ public class ThrottlingInflightRoutePoli
         // reload size in case a race condition with too many at once being 
invoked
         // so we need to ensure that we read the most current size and start 
the consumer if we are already to low
         size = getSize(consumer, exchange);
-        if (size <= resumeInflightExchanges) {
+        boolean start = size <= resumeInflightExchanges;
+        if (log.isTraceEnabled()) {
+            log.trace("{} <= {} evaluated as {}", new Object[]{size, 
resumeInflightExchanges, start});
+        }
+        if (start) {
             try {
                 lock.lock();
                 startConsumer(size, consumer);
@@ -185,16 +241,73 @@ public class ThrottlingInflightRoutePoli
     private void startConsumer(int size, Consumer consumer) throws Exception {
         boolean started = super.startConsumer(consumer);
         if (started) {
-            getLogger().log("Throttling consumer: " + size + " <= " + 
resumeInflightExchanges + " inflight exchange by resuming consumer.");
+            getLogger().log("Throttling consumer: " + size + " <= " + 
resumeInflightExchanges + " inflight exchange by resuming consumer: " + 
consumer);
         }
     }
 
     private void stopConsumer(int size, Consumer consumer) throws Exception {
         boolean stopped = super.stopConsumer(consumer);
         if (stopped) {
-            getLogger().log("Throttling consumer: " + size + " > " + 
maxInflightExchanges + " inflight exchange by suspending consumer.");
+            getLogger().log("Throttling consumer: " + size + " > " + 
maxInflightExchanges + " inflight exchange by suspending consumer: " + 
consumer);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        if (scope == ThrottlingScope.Context) {
+            eventNotifier = new ContextScopedEventNotifier();
+            // must start the notifier before it can be used
+            ServiceHelper.startService(eventNotifier);
+            // we are in context scope, so we need to use an event notifier to 
keep track
+            // when any exchanges is done on the camel context.
+            // This ensures we can trigger accordingly to context scope
+            
camelContext.getManagementStrategy().addEventNotifier(eventNotifier);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        if (scope == ThrottlingScope.Context) {
+            
camelContext.getManagementStrategy().removeEventNotifier(eventNotifier);
         }
     }
 
+    /**
+     * {@link org.apache.camel.spi.EventNotifier} to keep track on when {@link 
Exchange}
+     * is done, so we can throttle accordingly.
+     */
+    private class ContextScopedEventNotifier extends EventNotifierSupport {
+
+        @Override
+        public void notify(EventObject event) throws Exception {
+            // if context
+            ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) 
event;
+            for (Route route : routes) {
+                throttle(route, completedEvent.getExchange());
+            }
+        }
+
+        @Override
+        public boolean isEnabled(EventObject event) {
+            return event instanceof ExchangeCompletedEvent;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            // noop
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            // noop
+        }
+
+        @Override
+        public String toString() {
+            return "ContextScopedEventNotifier";
+        }
+    }
 
 }


Reply via email to