Author: davsclaus
Date: Thu Mar  8 16:08:07 2012
New Revision: 1298448

URL: http://svn.apache.org/viewvc?rev=1298448&view=rev
Log:
CAMEL-5057: Avoid synchronized in inflight registry.

Added:
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
      - copied unchanged from r1298447, 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
Removed:
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  8 16:08:07 2012
@@ -1 +1 @@
-/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155
+/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155,1298447

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 Thu Mar  8 16:08:07 2012
@@ -16,8 +16,8 @@
  */
 package org.apache.camel.impl;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
@@ -36,63 +36,48 @@ public class DefaultInflightRepository e
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(DefaultInflightRepository.class);
     private final AtomicInteger totalCount = new AtomicInteger();
-    // use endpoint key as key so endpoints with lenient properties is 
registered using the same key (eg dynamic http endpoints)
-    private final Map<String, AtomicInteger> endpointCount = new 
HashMap<String, AtomicInteger>();
+    private final ConcurrentMap<String, AtomicInteger> routeCount = new 
ConcurrentHashMap<String, AtomicInteger>();
 
     public void add(Exchange exchange) {
-        int count = totalCount.incrementAndGet();
-        LOG.trace("Total {} inflight exchanges. Last added: {}", count, 
exchange.getExchangeId());
-
-        if (exchange.getFromEndpoint() == null) {
-            return;
-        }
-
-        String key = exchange.getFromEndpoint().getEndpointKey();
-        // need to be synchronized as we can concurrently add/remove
-        synchronized (endpointCount) {
-            AtomicInteger existing = endpointCount.get(key);
-            if (existing != null) {
-                existing.incrementAndGet();
-            } else {
-                endpointCount.put(key, new AtomicInteger(1));
-            }
-        }
+        totalCount.incrementAndGet();
     }
 
     public void remove(Exchange exchange) {
-        int count = totalCount.decrementAndGet();
-        LOG.trace("Total {} inflight exchanges. Last removed: {}", count, 
exchange.getExchangeId());
-
-        if (exchange.getFromEndpoint() == null) {
-            return;
-        }
+        totalCount.decrementAndGet();
+    }
 
-        String key = exchange.getFromEndpoint().getEndpointKey();
-        // need to be synchronized as we can concurrently add/remove
-        synchronized (endpointCount) {
-            AtomicInteger existing = endpointCount.get(key);
-            if (existing != null) {
-                if (existing.decrementAndGet() <= 0) {
-                    endpointCount.remove(key);
-                }
-            }
+    public void add(Exchange exchange, String routeId) {
+        AtomicInteger existing = routeCount.putIfAbsent(routeId, new 
AtomicInteger(1));
+        if (existing != null) {
+            existing.incrementAndGet();
         }
     }
 
-    /**
-     * Internal only - Used for testing purpose.
-     */
-    int endpointSize() {
-        return endpointCount.size();
+    public void remove(Exchange exchange, String routeId) {
+        AtomicInteger existing = routeCount.get(routeId);
+        if (existing != null) {
+            existing.decrementAndGet();
+        }
     }
 
     public int size() {
         return totalCount.get();
     }
 
+    @Deprecated
     public int size(Endpoint endpoint) {
-        AtomicInteger answer = endpointCount.get(endpoint.getEndpointKey());
-        return answer != null ? answer.get() : 0;
+        return 0;
+    }
+
+    @Override
+    public void removeRoute(String routeId) {
+        routeCount.remove(routeId);
+    }
+
+    @Override
+    public int size(String routeId) {
+        AtomicInteger existing = routeCount.get(routeId);
+        return existing != null ? existing.get() : 0; 
     }
 
     @Override
@@ -107,8 +92,6 @@ public class DefaultInflightRepository e
         } else {
             LOG.info("Shutting down with no inflight exchanges.");
         }
-        synchronized (endpointCount) {
-            endpointCount.clear();
-        }
+        routeCount.clear();
     }
 }

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 Thu Mar  8 16:08:07 2012
@@ -35,6 +35,7 @@ import org.apache.camel.model.FromDefini
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.processor.Pipeline;
+import org.apache.camel.processor.RouteInflightRepositoryProcessor;
 import org.apache.camel.processor.RoutePolicyProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.InterceptStrategy;
@@ -168,14 +169,17 @@ public class DefaultRouteContext impleme
                 target = unitOfWorkProcessor;
             }
 
+            // wrap in route inflight processor to track number of inflight 
exchanges for the route
+            RouteInflightRepositoryProcessor inflight = new 
RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target);
+
             // and wrap it by a instrumentation processor that is to be used 
for performance stats
             // for this particular route
-            InstrumentationProcessor wrapper = new InstrumentationProcessor();
-            wrapper.setType("route");
-            wrapper.setProcessor(target);
+            InstrumentationProcessor instrument = new 
InstrumentationProcessor();
+            instrument.setType("route");
+            instrument.setProcessor(inflight);
 
             // and create the route that wraps the UoW
-            Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), 
wrapper);
+            Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), 
instrument);
             // create the route id
             String routeId = 
route.idOrCreate(getCamelContext().getNodeIdFactory());
             edcr.getProperties().put(Route.ID_PROPERTY, routeId);
@@ -188,6 +192,8 @@ public class DefaultRouteContext impleme
             if (routePolicyProcessor != null) {
                 routePolicyProcessor.setRoute(edcr);
             }
+            // after the route is created then set the route on the inflight 
processor so we get hold of it
+            inflight.setRoute(edcr);
 
             // invoke init on route policy
             if (routePolicyList != null && !routePolicyList.isEmpty()) {

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 Thu Mar  8 16:08:07 2012
@@ -432,17 +432,17 @@ public class DefaultShutdownStrategy ext
             while (!done) {
                 int size = 0;
                 for (RouteStartupOrder order : routes) {
+                    int inflight = 
context.getInflightRepository().size(order.getRoute().getId());
                     for (Consumer consumer : order.getInputs()) {
-                        int inflight = 
context.getInflightRepository().size(consumer.getEndpoint());
                         // include any additional pending exchanges on some 
consumers which may have internal
                         // memory queues such as seda
                         if (consumer instanceof ShutdownAware) {
                             inflight += ((ShutdownAware) 
consumer).getPendingExchangesSize();
                         }
-                        if (inflight > 0) {
-                            size += inflight;
-                            LOG.trace("{} inflight and pending exchanges for 
consumer: {}", inflight, consumer);
-                        }
+                    }
+                    if (inflight > 0) {
+                        size += inflight;
+                        LOG.trace("{} inflight and pending exchanges for 
route: {}", inflight, order.getRoute().getId());
                     }
                 }
                 if (size > 0) {

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
 Thu Mar  8 16:08:07 2012
@@ -75,9 +75,15 @@ public class EventDrivenConsumerRoute ex
     @SuppressWarnings("unchecked")
     public Navigate<Processor> navigate() {
         Processor answer = getProcessor();
+
+        // we do not want to navigate the instrument and inflight processors
+        // which is the first 2 delegate async processors, so skip them
         // skip the instrumentation processor if this route was wrapped by one
         if (answer instanceof DelegateAsyncProcessor) {
             answer = ((DelegateAsyncProcessor) answer).getProcessor();
+            if (answer instanceof DelegateAsyncProcessor) {
+                answer = ((DelegateAsyncProcessor) answer).getProcessor();
+            }
         }
 
         if (answer instanceof Navigate) {

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
 Thu Mar  8 16:08:07 2012
@@ -265,6 +265,11 @@ public class RouteService extends ChildS
         for (LifecycleStrategy strategy : 
camelContext.getLifecycleStrategies()) {
             strategy.onRoutesRemove(routes);
         }
+        
+        // remove the routes from the inflight registry
+        for (Route route : routes) {
+            camelContext.getInflightRepository().removeRoute(route.getId());
+        }
 
         // remove the routes from the collections
         camelContext.removeRouteCollection(routes);

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 Thu Mar  8 16:08:07 2012
@@ -107,7 +107,7 @@ public class ThrottlingInflightRoutePoli
         // this works the best when this logic is executed when the exchange 
is done
         Consumer consumer = route.getConsumer();
 
-        int size = getSize(consumer, exchange);
+        int size = getSize(route, exchange);
         boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
         if (log.isTraceEnabled()) {
             log.trace("{} > 0 && {} > {} evaluated as {}", new 
Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
@@ -125,7 +125,7 @@ public class ThrottlingInflightRoutePoli
 
         // reload size in case a race condition with too many at once being 
invoked
         // so we need to ensure that we read the most current size and start 
the consumer if we are already to low
-        size = getSize(consumer, exchange);
+        size = getSize(route, exchange);
         boolean start = size <= resumeInflightExchanges;
         if (log.isTraceEnabled()) {
             log.trace("{} <= {} evaluated as {}", new Object[]{size, 
resumeInflightExchanges, start});
@@ -229,12 +229,11 @@ public class ThrottlingInflightRoutePoli
         return new 
CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), 
getLoggingLevel());
     }
 
-    private int getSize(Consumer consumer, Exchange exchange) {
+    private int getSize(Route route, Exchange exchange) {
         if (scope == ThrottlingScope.Context) {
             return exchange.getContext().getInflightRepository().size();
         } else {
-            Endpoint endpoint = consumer.getEndpoint();
-            return 
exchange.getContext().getInflightRepository().size(endpoint);
+            return 
exchange.getContext().getInflightRepository().size(route.getId());
         }
     }
 

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
 Thu Mar  8 16:08:07 2012
@@ -42,7 +42,11 @@ public class ManagedConsumer extends Man
     }
 
     public Integer getInflightExchanges() {
-        return 
getContext().getInflightRepository().size(consumer.getEndpoint());
+        if (getRouteId() != null) {
+            return getContext().getInflightRepository().size(getRouteId());
+        } else {
+            return null;
+        }
     }
 
 }

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
 Thu Mar  8 16:08:07 2012
@@ -85,11 +85,7 @@ public class ManagedRoute extends Manage
     }
 
     public Integer getInflightExchanges() {
-        if (route.getEndpoint() != null) {
-            return context.getInflightRepository().size(route.getEndpoint());
-        } else {
-            return null;
-        }
+        return context.getInflightRepository().size(route.getId());
     }
 
     public String getCamelId() {

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
 Thu Mar  8 16:08:07 2012
@@ -28,20 +28,36 @@ import org.apache.camel.Service;
 public interface InflightRepository extends Service {
 
     /**
-     * Adds the exchange to the inflight registry
+     * Adds the exchange to the inflight registry to the total counter
      *
      * @param exchange  the exchange
      */
     void add(Exchange exchange);
 
     /**
-     * Removes the exchange from the inflight registry
+     * Removes the exchange from the inflight registry to the total counter
      *
      * @param exchange  the exchange
      */
     void remove(Exchange exchange);
 
     /**
+     * Adds the exchange to the inflight registry associated to the given route
+     *
+     * @param exchange  the exchange
+     * @param routeId the id of the route
+     */
+    void add(Exchange exchange, String routeId);
+
+    /**
+     * Removes the exchange from the inflight registry removing association to 
the given route
+     *
+     * @param exchange  the exchange
+     * @param routeId the id of the route
+     */
+    void remove(Exchange exchange, String routeId);
+
+    /**
      * Current size of inflight exchanges.
      * <p/>
      * Will return 0 if there are no inflight exchanges.
@@ -51,13 +67,29 @@ public interface InflightRepository exte
     int size();
 
     /**
-     * Current size of inflight exchanges which are from the given endpoint.
+     * Will always return 0 due method is deprecated.
+     * @deprecated will be removed in a future Camel release.
+     */
+    @Deprecated
+    int size(Endpoint endpoint);
+
+    /**
+     * Removes the route from the in flight registry.
+     * <p/>
+     * Is used for cleaning up resources to avoid leaking.
+     *
+     * @param routeId the id of the route
+     */
+    void removeRoute(String routeId);
+
+    /**
+    * Current size of inflight exchanges which are from the given route.
      * <p/>
      * Will return 0 if there are no inflight exchanges.
      *
-     * @param endpoint the endpoint where the {@link Exchange} are from.
+     * @param routeId the id of the route
      * @return number of exchanges currently in flight.
      */
-    int size(Endpoint endpoint);
+    int size(String routeId);
 
 }

Modified: 
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java?rev=1298448&r1=1298447&r2=1298448&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
 Thu Mar  8 16:08:07 2012
@@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBui
  */
 public class InflightRepositoryRouteTest extends ContextTestSupport {
 
-
     public void testInflight() throws Exception {
         context.setInflightRepository(new MyInflightRepo());
 
@@ -34,8 +33,7 @@ public class InflightRepositoryRouteTest
         template.sendBody("direct:start", "Hello World");
 
         assertEquals(0, context.getInflightRepository().size());
-        assertEquals(0, 
context.getInflightRepository().size(context.getEndpoint("direct:start")));
-        assertEquals(0, 
context.getInflightRepository().size(context.getEndpoint("mock:result")));
+        assertEquals(0, context.getInflightRepository().size("foo"));
     }
 
     @Override
@@ -43,7 +41,7 @@ public class InflightRepositoryRouteTest
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("mock:result");
+                from("direct:start").routeId("foo").to("mock:result");
             }
         };
     }
@@ -53,18 +51,13 @@ public class InflightRepositoryRouteTest
         @Override
         public void add(Exchange exchange) {
             super.add(exchange);
-
             assertEquals(1, context.getInflightRepository().size());
-
-            assertEquals(1, size(context.getEndpoint("direct:start")));
-
-            // but 0 from this endpoint
-            assertEquals(0, size(context.getEndpoint("mock:result")));
         }
 
         @Override
-        public void remove(Exchange exchange) {
-            super.remove(exchange);
+        public void add(Exchange exchange, String routeId) {
+            super.add(exchange, routeId);
+            assertEquals(1, context.getInflightRepository().size("foo"));
         }
     }
 }


Reply via email to