Author: davsclaus
Date: Thu Mar  8 16:02:04 2012
New Revision: 1298447

URL: http://svn.apache.org/viewvc?rev=1298447&view=rev
Log:
CAMEL-5057: Avoid synchronized in inflight registry.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
Removed:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 Thu Mar  8 16:02:04 2012
@@ -16,8 +16,8 @@
  */
 package org.apache.camel.impl;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
@@ -36,63 +36,48 @@ public class DefaultInflightRepository e
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(DefaultInflightRepository.class);
     private final AtomicInteger totalCount = new AtomicInteger();
-    // use endpoint key as key so endpoints with lenient properties is 
registered using the same key (eg dynamic http endpoints)
-    private final Map<String, AtomicInteger> endpointCount = new 
HashMap<String, AtomicInteger>();
+    private final ConcurrentMap<String, AtomicInteger> routeCount = new 
ConcurrentHashMap<String, AtomicInteger>();
 
     public void add(Exchange exchange) {
-        int count = totalCount.incrementAndGet();
-        LOG.trace("Total {} inflight exchanges. Last added: {}", count, 
exchange.getExchangeId());
-
-        if (exchange.getFromEndpoint() == null) {
-            return;
-        }
-
-        String key = exchange.getFromEndpoint().getEndpointKey();
-        // need to be synchronized as we can concurrently add/remove
-        synchronized (endpointCount) {
-            AtomicInteger existing = endpointCount.get(key);
-            if (existing != null) {
-                existing.incrementAndGet();
-            } else {
-                endpointCount.put(key, new AtomicInteger(1));
-            }
-        }
+        totalCount.incrementAndGet();
     }
 
     public void remove(Exchange exchange) {
-        int count = totalCount.decrementAndGet();
-        LOG.trace("Total {} inflight exchanges. Last removed: {}", count, 
exchange.getExchangeId());
-
-        if (exchange.getFromEndpoint() == null) {
-            return;
-        }
+        totalCount.decrementAndGet();
+    }
 
-        String key = exchange.getFromEndpoint().getEndpointKey();
-        // need to be synchronized as we can concurrently add/remove
-        synchronized (endpointCount) {
-            AtomicInteger existing = endpointCount.get(key);
-            if (existing != null) {
-                if (existing.decrementAndGet() <= 0) {
-                    endpointCount.remove(key);
-                }
-            }
+    public void add(Exchange exchange, String routeId) {
+        AtomicInteger existing = routeCount.putIfAbsent(routeId, new 
AtomicInteger(1));
+        if (existing != null) {
+            existing.incrementAndGet();
         }
     }
 
-    /**
-     * Internal only - Used for testing purpose.
-     */
-    int endpointSize() {
-        return endpointCount.size();
+    public void remove(Exchange exchange, String routeId) {
+        AtomicInteger existing = routeCount.get(routeId);
+        if (existing != null) {
+            existing.decrementAndGet();
+        }
     }
 
     public int size() {
         return totalCount.get();
     }
 
+    @Deprecated
     public int size(Endpoint endpoint) {
-        AtomicInteger answer = endpointCount.get(endpoint.getEndpointKey());
-        return answer != null ? answer.get() : 0;
+        return 0;
+    }
+
+    @Override
+    public void removeRoute(String routeId) {
+        routeCount.remove(routeId);
+    }
+
+    @Override
+    public int size(String routeId) {
+        AtomicInteger existing = routeCount.get(routeId);
+        return existing != null ? existing.get() : 0; 
     }
 
     @Override
@@ -107,8 +92,6 @@ public class DefaultInflightRepository e
         } else {
             LOG.debug("Shutting down with no inflight exchanges.");
         }
-        synchronized (endpointCount) {
-            endpointCount.clear();
-        }
+        routeCount.clear();
     }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 Thu Mar  8 16:02:04 2012
@@ -35,6 +35,7 @@ import org.apache.camel.model.FromDefini
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.processor.Pipeline;
+import org.apache.camel.processor.RouteInflightRepositoryProcessor;
 import org.apache.camel.processor.RoutePolicyProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.InterceptStrategy;
@@ -168,14 +169,17 @@ public class DefaultRouteContext impleme
                 target = unitOfWorkProcessor;
             }
 
+            // wrap in route inflight processor to track number of inflight 
exchanges for the route
+            RouteInflightRepositoryProcessor inflight = new 
RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target);
+
             // and wrap it by a instrumentation processor that is to be used 
for performance stats
             // for this particular route
-            InstrumentationProcessor wrapper = new InstrumentationProcessor();
-            wrapper.setType("route");
-            wrapper.setProcessor(target);
+            InstrumentationProcessor instrument = new 
InstrumentationProcessor();
+            instrument.setType("route");
+            instrument.setProcessor(inflight);
 
             // and create the route that wraps the UoW
-            Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), 
wrapper);
+            Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), 
instrument);
             // create the route id
             String routeId = 
route.idOrCreate(getCamelContext().getNodeIdFactory());
             edcr.getProperties().put(Route.ID_PROPERTY, routeId);
@@ -188,6 +192,8 @@ public class DefaultRouteContext impleme
             if (routePolicyProcessor != null) {
                 routePolicyProcessor.setRoute(edcr);
             }
+            // after the route is created then set the route on the inflight 
processor so we get hold of it
+            inflight.setRoute(edcr);
 
             // invoke init on route policy
             if (routePolicyList != null && !routePolicyList.isEmpty()) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 Thu Mar  8 16:02:04 2012
@@ -504,17 +504,17 @@ public class DefaultShutdownStrategy ext
             while (!done) {
                 int size = 0;
                 for (RouteStartupOrder order : routes) {
+                    int inflight = 
context.getInflightRepository().size(order.getRoute().getId());
                     for (Consumer consumer : order.getInputs()) {
-                        int inflight = 
context.getInflightRepository().size(consumer.getEndpoint());
                         // include any additional pending exchanges on some 
consumers which may have internal
                         // memory queues such as seda
                         if (consumer instanceof ShutdownAware) {
                             inflight += ((ShutdownAware) 
consumer).getPendingExchangesSize();
                         }
-                        if (inflight > 0) {
-                            size += inflight;
-                            LOG.trace("{} inflight and pending exchanges for 
consumer: {}", inflight, consumer);
-                        }
+                    }
+                    if (inflight > 0) {
+                        size += inflight;
+                        LOG.trace("{} inflight and pending exchanges for 
route: {}", inflight, order.getRoute().getId());
                     }
                 }
                 if (size > 0) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
 Thu Mar  8 16:02:04 2012
@@ -75,9 +75,15 @@ public class EventDrivenConsumerRoute ex
     @SuppressWarnings("unchecked")
     public Navigate<Processor> navigate() {
         Processor answer = getProcessor();
+
+        // we do not want to navigate the instrument and inflight processors
+        // which is the first 2 delegate async processors, so skip them
         // skip the instrumentation processor if this route was wrapped by one
         if (answer instanceof DelegateAsyncProcessor) {
             answer = ((DelegateAsyncProcessor) answer).getProcessor();
+            if (answer instanceof DelegateAsyncProcessor) {
+                answer = ((DelegateAsyncProcessor) answer).getProcessor();
+            }
         }
 
         if (answer instanceof Navigate) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java 
Thu Mar  8 16:02:04 2012
@@ -272,6 +272,11 @@ public class RouteService extends ChildS
         for (LifecycleStrategy strategy : 
camelContext.getLifecycleStrategies()) {
             strategy.onRoutesRemove(routes);
         }
+        
+        // remove the routes from the inflight registry
+        for (Route route : routes) {
+            camelContext.getInflightRepository().removeRoute(route.getId());
+        }
 
         // remove the routes from the collections
         camelContext.removeRouteCollection(routes);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
 Thu Mar  8 16:02:04 2012
@@ -107,7 +107,7 @@ public class ThrottlingInflightRoutePoli
         // this works the best when this logic is executed when the exchange 
is done
         Consumer consumer = route.getConsumer();
 
-        int size = getSize(consumer, exchange);
+        int size = getSize(route, exchange);
         boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
         if (log.isTraceEnabled()) {
             log.trace("{} > 0 && {} > {} evaluated as {}", new 
Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
@@ -125,7 +125,7 @@ public class ThrottlingInflightRoutePoli
 
         // reload size in case a race condition with too many at once being 
invoked
         // so we need to ensure that we read the most current size and start 
the consumer if we are already to low
-        size = getSize(consumer, exchange);
+        size = getSize(route, exchange);
         boolean start = size <= resumeInflightExchanges;
         if (log.isTraceEnabled()) {
             log.trace("{} <= {} evaluated as {}", new Object[]{size, 
resumeInflightExchanges, start});
@@ -229,12 +229,11 @@ public class ThrottlingInflightRoutePoli
         return new 
CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), 
getLoggingLevel());
     }
 
-    private int getSize(Consumer consumer, Exchange exchange) {
+    private int getSize(Route route, Exchange exchange) {
         if (scope == ThrottlingScope.Context) {
             return exchange.getContext().getInflightRepository().size();
         } else {
-            Endpoint endpoint = consumer.getEndpoint();
-            return 
exchange.getContext().getInflightRepository().size(endpoint);
+            return 
exchange.getContext().getInflightRepository().size(route.getId());
         }
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
 Thu Mar  8 16:02:04 2012
@@ -42,7 +42,11 @@ public class ManagedConsumer extends Man
     }
 
     public Integer getInflightExchanges() {
-        return 
getContext().getInflightRepository().size(consumer.getEndpoint());
+        if (getRouteId() != null) {
+            return getContext().getInflightRepository().size(getRouteId());
+        } else {
+            return null;
+        }
     }
 
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
 Thu Mar  8 16:02:04 2012
@@ -90,11 +90,7 @@ public class ManagedRoute extends Manage
     }
 
     public Integer getInflightExchanges() {
-        if (route.getEndpoint() != null) {
-            return context.getInflightRepository().size(route.getEndpoint());
-        } else {
-            return null;
-        }
+        return context.getInflightRepository().size(route.getId());
     }
 
     public String getCamelId() {

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java?rev=1298447&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java
 Thu Mar  8 16:02:04 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * This processor tracks the current {@link RouteContext} while processing the 
{@link Exchange}.
+ * This ensures that the {@link Exchange} have details under which route its 
being currently processed.
+ */
+public class RouteInflightRepositoryProcessor extends DelegateAsyncProcessor {
+    
+    private final InflightRepository inflightRepository;
+    private Route route;
+    private String id;
+
+    public RouteInflightRepositoryProcessor(InflightRepository 
inflightRepository, Processor processor) {
+        super(processor);
+        this.inflightRepository = inflightRepository;
+    }
+
+    public void setRoute(Route route) {
+        this.route = route;
+        this.id = route.getId();
+    }
+
+    @Override
+    protected boolean processNext(final Exchange exchange, final AsyncCallback 
callback) {
+        inflightRepository.add(exchange, id);
+        
+        boolean sync = processor.process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                try {
+                    inflightRepository.remove(exchange, id);
+                } finally {
+                    callback.done(doneSync);
+                }
+            }
+        });
+        return sync;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }
+
+}

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
 Thu Mar  8 16:02:04 2012
@@ -28,20 +28,36 @@ import org.apache.camel.Service;
 public interface InflightRepository extends Service {
 
     /**
-     * Adds the exchange to the inflight registry
+     * Adds the exchange to the inflight registry to the total counter
      *
      * @param exchange  the exchange
      */
     void add(Exchange exchange);
 
     /**
-     * Removes the exchange from the inflight registry
+     * Removes the exchange from the inflight registry to the total counter
      *
      * @param exchange  the exchange
      */
     void remove(Exchange exchange);
 
     /**
+     * Adds the exchange to the inflight registry associated to the given route
+     *
+     * @param exchange  the exchange
+     * @param routeId the id of the route
+     */
+    void add(Exchange exchange, String routeId);
+
+    /**
+     * Removes the exchange from the inflight registry removing association to 
the given route
+     *
+     * @param exchange  the exchange
+     * @param routeId the id of the route
+     */
+    void remove(Exchange exchange, String routeId);
+
+    /**
      * Current size of inflight exchanges.
      * <p/>
      * Will return 0 if there are no inflight exchanges.
@@ -51,13 +67,29 @@ public interface InflightRepository exte
     int size();
 
     /**
-     * Current size of inflight exchanges which are from the given endpoint.
+     * Will always return 0 due method is deprecated.
+     * @deprecated will be removed in a future Camel release.
+     */
+    @Deprecated
+    int size(Endpoint endpoint);
+
+    /**
+     * Removes the route from the in flight registry.
+     * <p/>
+     * Is used for cleaning up resources to avoid leaking.
+     *
+     * @param routeId the id of the route
+     */
+    void removeRoute(String routeId);
+
+    /**
+    * Current size of inflight exchanges which are from the given route.
      * <p/>
      * Will return 0 if there are no inflight exchanges.
      *
-     * @param endpoint the endpoint where the {@link Exchange} are from.
+     * @param routeId the id of the route
      * @return number of exchanges currently in flight.
      */
-    int size(Endpoint endpoint);
+    int size(String routeId);
 
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java?rev=1298447&r1=1298446&r2=1298447&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
 Thu Mar  8 16:02:04 2012
@@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBui
  */
 public class InflightRepositoryRouteTest extends ContextTestSupport {
 
-
     public void testInflight() throws Exception {
         context.setInflightRepository(new MyInflightRepo());
 
@@ -34,8 +33,7 @@ public class InflightRepositoryRouteTest
         template.sendBody("direct:start", "Hello World");
 
         assertEquals(0, context.getInflightRepository().size());
-        assertEquals(0, 
context.getInflightRepository().size(context.getEndpoint("direct:start")));
-        assertEquals(0, 
context.getInflightRepository().size(context.getEndpoint("mock:result")));
+        assertEquals(0, context.getInflightRepository().size("foo"));
     }
 
     @Override
@@ -43,7 +41,7 @@ public class InflightRepositoryRouteTest
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("mock:result");
+                from("direct:start").routeId("foo").to("mock:result");
             }
         };
     }
@@ -53,18 +51,13 @@ public class InflightRepositoryRouteTest
         @Override
         public void add(Exchange exchange) {
             super.add(exchange);
-
             assertEquals(1, context.getInflightRepository().size());
-
-            assertEquals(1, size(context.getEndpoint("direct:start")));
-
-            // but 0 from this endpoint
-            assertEquals(0, size(context.getEndpoint("mock:result")));
         }
 
         @Override
-        public void remove(Exchange exchange) {
-            super.remove(exchange);
+        public void add(Exchange exchange, String routeId) {
+            super.add(exchange, routeId);
+            assertEquals(1, context.getInflightRepository().size("foo"));
         }
     }
 }


Reply via email to