Author: davsclaus Date: Thu Mar 8 16:08:07 2012 New Revision: 1298448 URL: http://svn.apache.org/viewvc?rev=1298448&view=rev Log: CAMEL-5057: Avoid synchronized in inflight registry.
Added: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java - copied unchanged from r1298447, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java Removed: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 8 16:08:07 2012 @@ -1 +1 @@ -/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155 +/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155,1298447 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Thu Mar 8 16:08:07 2012 @@ -16,8 +16,8 @@ */ package org.apache.camel.impl; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; @@ -36,63 +36,48 @@ public class DefaultInflightRepository e private static final transient Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); private final AtomicInteger totalCount = new AtomicInteger(); - // use endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints) - private final Map<String, AtomicInteger> endpointCount = new HashMap<String, AtomicInteger>(); + private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>(); public void add(Exchange exchange) { - int count = totalCount.incrementAndGet(); - LOG.trace("Total {} inflight exchanges. Last added: {}", count, exchange.getExchangeId()); - - if (exchange.getFromEndpoint() == null) { - return; - } - - String key = exchange.getFromEndpoint().getEndpointKey(); - // need to be synchronized as we can concurrently add/remove - synchronized (endpointCount) { - AtomicInteger existing = endpointCount.get(key); - if (existing != null) { - existing.incrementAndGet(); - } else { - endpointCount.put(key, new AtomicInteger(1)); - } - } + totalCount.incrementAndGet(); } public void remove(Exchange exchange) { - int count = totalCount.decrementAndGet(); - LOG.trace("Total {} inflight exchanges. Last removed: {}", count, exchange.getExchangeId()); - - if (exchange.getFromEndpoint() == null) { - return; - } + totalCount.decrementAndGet(); + } - String key = exchange.getFromEndpoint().getEndpointKey(); - // need to be synchronized as we can concurrently add/remove - synchronized (endpointCount) { - AtomicInteger existing = endpointCount.get(key); - if (existing != null) { - if (existing.decrementAndGet() <= 0) { - endpointCount.remove(key); - } - } + public void add(Exchange exchange, String routeId) { + AtomicInteger existing = routeCount.putIfAbsent(routeId, new AtomicInteger(1)); + if (existing != null) { + existing.incrementAndGet(); } } - /** - * Internal only - Used for testing purpose. - */ - int endpointSize() { - return endpointCount.size(); + public void remove(Exchange exchange, String routeId) { + AtomicInteger existing = routeCount.get(routeId); + if (existing != null) { + existing.decrementAndGet(); + } } public int size() { return totalCount.get(); } + @Deprecated public int size(Endpoint endpoint) { - AtomicInteger answer = endpointCount.get(endpoint.getEndpointKey()); - return answer != null ? answer.get() : 0; + return 0; + } + + @Override + public void removeRoute(String routeId) { + routeCount.remove(routeId); + } + + @Override + public int size(String routeId) { + AtomicInteger existing = routeCount.get(routeId); + return existing != null ? existing.get() : 0; } @Override @@ -107,8 +92,6 @@ public class DefaultInflightRepository e } else { LOG.info("Shutting down with no inflight exchanges."); } - synchronized (endpointCount) { - endpointCount.clear(); - } + routeCount.clear(); } } Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Thu Mar 8 16:08:07 2012 @@ -35,6 +35,7 @@ import org.apache.camel.model.FromDefini import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; import org.apache.camel.processor.Pipeline; +import org.apache.camel.processor.RouteInflightRepositoryProcessor; import org.apache.camel.processor.RoutePolicyProcessor; import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.spi.InterceptStrategy; @@ -168,14 +169,17 @@ public class DefaultRouteContext impleme target = unitOfWorkProcessor; } + // wrap in route inflight processor to track number of inflight exchanges for the route + RouteInflightRepositoryProcessor inflight = new RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target); + // and wrap it by a instrumentation processor that is to be used for performance stats // for this particular route - InstrumentationProcessor wrapper = new InstrumentationProcessor(); - wrapper.setType("route"); - wrapper.setProcessor(target); + InstrumentationProcessor instrument = new InstrumentationProcessor(); + instrument.setType("route"); + instrument.setProcessor(inflight); // and create the route that wraps the UoW - Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), wrapper); + Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), instrument); // create the route id String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); edcr.getProperties().put(Route.ID_PROPERTY, routeId); @@ -188,6 +192,8 @@ public class DefaultRouteContext impleme if (routePolicyProcessor != null) { routePolicyProcessor.setRoute(edcr); } + // after the route is created then set the route on the inflight processor so we get hold of it + inflight.setRoute(edcr); // invoke init on route policy if (routePolicyList != null && !routePolicyList.isEmpty()) { Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Thu Mar 8 16:08:07 2012 @@ -432,17 +432,17 @@ public class DefaultShutdownStrategy ext while (!done) { int size = 0; for (RouteStartupOrder order : routes) { + int inflight = context.getInflightRepository().size(order.getRoute().getId()); for (Consumer consumer : order.getInputs()) { - int inflight = context.getInflightRepository().size(consumer.getEndpoint()); // include any additional pending exchanges on some consumers which may have internal // memory queues such as seda if (consumer instanceof ShutdownAware) { inflight += ((ShutdownAware) consumer).getPendingExchangesSize(); } - if (inflight > 0) { - size += inflight; - LOG.trace("{} inflight and pending exchanges for consumer: {}", inflight, consumer); - } + } + if (inflight > 0) { + size += inflight; + LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId()); } } if (size > 0) { Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java Thu Mar 8 16:08:07 2012 @@ -75,9 +75,15 @@ public class EventDrivenConsumerRoute ex @SuppressWarnings("unchecked") public Navigate<Processor> navigate() { Processor answer = getProcessor(); + + // we do not want to navigate the instrument and inflight processors + // which is the first 2 delegate async processors, so skip them // skip the instrumentation processor if this route was wrapped by one if (answer instanceof DelegateAsyncProcessor) { answer = ((DelegateAsyncProcessor) answer).getProcessor(); + if (answer instanceof DelegateAsyncProcessor) { + answer = ((DelegateAsyncProcessor) answer).getProcessor(); + } } if (answer instanceof Navigate) { Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Thu Mar 8 16:08:07 2012 @@ -265,6 +265,11 @@ public class RouteService extends ChildS for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onRoutesRemove(routes); } + + // remove the routes from the inflight registry + for (Route route : routes) { + camelContext.getInflightRepository().removeRoute(route.getId()); + } // remove the routes from the collections camelContext.removeRouteCollection(routes); Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java Thu Mar 8 16:08:07 2012 @@ -107,7 +107,7 @@ public class ThrottlingInflightRoutePoli // this works the best when this logic is executed when the exchange is done Consumer consumer = route.getConsumer(); - int size = getSize(consumer, exchange); + int size = getSize(route, exchange); boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges; if (log.isTraceEnabled()) { log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop}); @@ -125,7 +125,7 @@ public class ThrottlingInflightRoutePoli // reload size in case a race condition with too many at once being invoked // so we need to ensure that we read the most current size and start the consumer if we are already to low - size = getSize(consumer, exchange); + size = getSize(route, exchange); boolean start = size <= resumeInflightExchanges; if (log.isTraceEnabled()) { log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start}); @@ -229,12 +229,11 @@ public class ThrottlingInflightRoutePoli return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel()); } - private int getSize(Consumer consumer, Exchange exchange) { + private int getSize(Route route, Exchange exchange) { if (scope == ThrottlingScope.Context) { return exchange.getContext().getInflightRepository().size(); } else { - Endpoint endpoint = consumer.getEndpoint(); - return exchange.getContext().getInflightRepository().size(endpoint); + return exchange.getContext().getInflightRepository().size(route.getId()); } } Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java Thu Mar 8 16:08:07 2012 @@ -42,7 +42,11 @@ public class ManagedConsumer extends Man } public Integer getInflightExchanges() { - return getContext().getInflightRepository().size(consumer.getEndpoint()); + if (getRouteId() != null) { + return getContext().getInflightRepository().size(getRouteId()); + } else { + return null; + } } } Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java Thu Mar 8 16:08:07 2012 @@ -85,11 +85,7 @@ public class ManagedRoute extends Manage } public Integer getInflightExchanges() { - if (route.getEndpoint() != null) { - return context.getInflightRepository().size(route.getEndpoint()); - } else { - return null; - } + return context.getInflightRepository().size(route.getId()); } public String getCamelId() { Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java Thu Mar 8 16:08:07 2012 @@ -28,20 +28,36 @@ import org.apache.camel.Service; public interface InflightRepository extends Service { /** - * Adds the exchange to the inflight registry + * Adds the exchange to the inflight registry to the total counter * * @param exchange the exchange */ void add(Exchange exchange); /** - * Removes the exchange from the inflight registry + * Removes the exchange from the inflight registry to the total counter * * @param exchange the exchange */ void remove(Exchange exchange); /** + * Adds the exchange to the inflight registry associated to the given route + * + * @param exchange the exchange + * @param routeId the id of the route + */ + void add(Exchange exchange, String routeId); + + /** + * Removes the exchange from the inflight registry removing association to the given route + * + * @param exchange the exchange + * @param routeId the id of the route + */ + void remove(Exchange exchange, String routeId); + + /** * Current size of inflight exchanges. * <p/> * Will return 0 if there are no inflight exchanges. @@ -51,13 +67,29 @@ public interface InflightRepository exte int size(); /** - * Current size of inflight exchanges which are from the given endpoint. + * Will always return 0 due method is deprecated. + * @deprecated will be removed in a future Camel release. + */ + @Deprecated + int size(Endpoint endpoint); + + /** + * Removes the route from the in flight registry. + * <p/> + * Is used for cleaning up resources to avoid leaking. + * + * @param routeId the id of the route + */ + void removeRoute(String routeId); + + /** + * Current size of inflight exchanges which are from the given route. * <p/> * Will return 0 if there are no inflight exchanges. * - * @param endpoint the endpoint where the {@link Exchange} are from. + * @param routeId the id of the route * @return number of exchanges currently in flight. */ - int size(Endpoint endpoint); + int size(String routeId); } Modified: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java?rev=1298448&r1=1298447&r2=1298448&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java Thu Mar 8 16:08:07 2012 @@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBui */ public class InflightRepositoryRouteTest extends ContextTestSupport { - public void testInflight() throws Exception { context.setInflightRepository(new MyInflightRepo()); @@ -34,8 +33,7 @@ public class InflightRepositoryRouteTest template.sendBody("direct:start", "Hello World"); assertEquals(0, context.getInflightRepository().size()); - assertEquals(0, context.getInflightRepository().size(context.getEndpoint("direct:start"))); - assertEquals(0, context.getInflightRepository().size(context.getEndpoint("mock:result"))); + assertEquals(0, context.getInflightRepository().size("foo")); } @Override @@ -43,7 +41,7 @@ public class InflightRepositoryRouteTest return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("mock:result"); + from("direct:start").routeId("foo").to("mock:result"); } }; } @@ -53,18 +51,13 @@ public class InflightRepositoryRouteTest @Override public void add(Exchange exchange) { super.add(exchange); - assertEquals(1, context.getInflightRepository().size()); - - assertEquals(1, size(context.getEndpoint("direct:start"))); - - // but 0 from this endpoint - assertEquals(0, size(context.getEndpoint("mock:result"))); } @Override - public void remove(Exchange exchange) { - super.remove(exchange); + public void add(Exchange exchange, String routeId) { + super.add(exchange, routeId); + assertEquals(1, context.getInflightRepository().size("foo")); } } }