Author: davsclaus Date: Thu Mar 8 16:02:04 2012 New Revision: 1298447 URL: http://svn.apache.org/viewvc?rev=1298447&view=rev Log: CAMEL-5057: Avoid synchronized in inflight registry.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java Removed: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Thu Mar 8 16:02:04 2012 @@ -16,8 +16,8 @@ */ package org.apache.camel.impl; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; @@ -36,63 +36,48 @@ public class DefaultInflightRepository e private static final transient Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); private final AtomicInteger totalCount = new AtomicInteger(); - // use endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints) - private final Map<String, AtomicInteger> endpointCount = new HashMap<String, AtomicInteger>(); + private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>(); public void add(Exchange exchange) { - int count = totalCount.incrementAndGet(); - LOG.trace("Total {} inflight exchanges. Last added: {}", count, exchange.getExchangeId()); - - if (exchange.getFromEndpoint() == null) { - return; - } - - String key = exchange.getFromEndpoint().getEndpointKey(); - // need to be synchronized as we can concurrently add/remove - synchronized (endpointCount) { - AtomicInteger existing = endpointCount.get(key); - if (existing != null) { - existing.incrementAndGet(); - } else { - endpointCount.put(key, new AtomicInteger(1)); - } - } + totalCount.incrementAndGet(); } public void remove(Exchange exchange) { - int count = totalCount.decrementAndGet(); - LOG.trace("Total {} inflight exchanges. Last removed: {}", count, exchange.getExchangeId()); - - if (exchange.getFromEndpoint() == null) { - return; - } + totalCount.decrementAndGet(); + } - String key = exchange.getFromEndpoint().getEndpointKey(); - // need to be synchronized as we can concurrently add/remove - synchronized (endpointCount) { - AtomicInteger existing = endpointCount.get(key); - if (existing != null) { - if (existing.decrementAndGet() <= 0) { - endpointCount.remove(key); - } - } + public void add(Exchange exchange, String routeId) { + AtomicInteger existing = routeCount.putIfAbsent(routeId, new AtomicInteger(1)); + if (existing != null) { + existing.incrementAndGet(); } } - /** - * Internal only - Used for testing purpose. - */ - int endpointSize() { - return endpointCount.size(); + public void remove(Exchange exchange, String routeId) { + AtomicInteger existing = routeCount.get(routeId); + if (existing != null) { + existing.decrementAndGet(); + } } public int size() { return totalCount.get(); } + @Deprecated public int size(Endpoint endpoint) { - AtomicInteger answer = endpointCount.get(endpoint.getEndpointKey()); - return answer != null ? answer.get() : 0; + return 0; + } + + @Override + public void removeRoute(String routeId) { + routeCount.remove(routeId); + } + + @Override + public int size(String routeId) { + AtomicInteger existing = routeCount.get(routeId); + return existing != null ? existing.get() : 0; } @Override @@ -107,8 +92,6 @@ public class DefaultInflightRepository e } else { LOG.debug("Shutting down with no inflight exchanges."); } - synchronized (endpointCount) { - endpointCount.clear(); - } + routeCount.clear(); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Thu Mar 8 16:02:04 2012 @@ -35,6 +35,7 @@ import org.apache.camel.model.FromDefini import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; import org.apache.camel.processor.Pipeline; +import org.apache.camel.processor.RouteInflightRepositoryProcessor; import org.apache.camel.processor.RoutePolicyProcessor; import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.spi.InterceptStrategy; @@ -168,14 +169,17 @@ public class DefaultRouteContext impleme target = unitOfWorkProcessor; } + // wrap in route inflight processor to track number of inflight exchanges for the route + RouteInflightRepositoryProcessor inflight = new RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target); + // and wrap it by a instrumentation processor that is to be used for performance stats // for this particular route - InstrumentationProcessor wrapper = new InstrumentationProcessor(); - wrapper.setType("route"); - wrapper.setProcessor(target); + InstrumentationProcessor instrument = new InstrumentationProcessor(); + instrument.setType("route"); + instrument.setProcessor(inflight); // and create the route that wraps the UoW - Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), wrapper); + Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), instrument); // create the route id String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); edcr.getProperties().put(Route.ID_PROPERTY, routeId); @@ -188,6 +192,8 @@ public class DefaultRouteContext impleme if (routePolicyProcessor != null) { routePolicyProcessor.setRoute(edcr); } + // after the route is created then set the route on the inflight processor so we get hold of it + inflight.setRoute(edcr); // invoke init on route policy if (routePolicyList != null && !routePolicyList.isEmpty()) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Thu Mar 8 16:02:04 2012 @@ -504,17 +504,17 @@ public class DefaultShutdownStrategy ext while (!done) { int size = 0; for (RouteStartupOrder order : routes) { + int inflight = context.getInflightRepository().size(order.getRoute().getId()); for (Consumer consumer : order.getInputs()) { - int inflight = context.getInflightRepository().size(consumer.getEndpoint()); // include any additional pending exchanges on some consumers which may have internal // memory queues such as seda if (consumer instanceof ShutdownAware) { inflight += ((ShutdownAware) consumer).getPendingExchangesSize(); } - if (inflight > 0) { - size += inflight; - LOG.trace("{} inflight and pending exchanges for consumer: {}", inflight, consumer); - } + } + if (inflight > 0) { + size += inflight; + LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId()); } } if (size > 0) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java Thu Mar 8 16:02:04 2012 @@ -75,9 +75,15 @@ public class EventDrivenConsumerRoute ex @SuppressWarnings("unchecked") public Navigate<Processor> navigate() { Processor answer = getProcessor(); + + // we do not want to navigate the instrument and inflight processors + // which is the first 2 delegate async processors, so skip them // skip the instrumentation processor if this route was wrapped by one if (answer instanceof DelegateAsyncProcessor) { answer = ((DelegateAsyncProcessor) answer).getProcessor(); + if (answer instanceof DelegateAsyncProcessor) { + answer = ((DelegateAsyncProcessor) answer).getProcessor(); + } } if (answer instanceof Navigate) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Thu Mar 8 16:02:04 2012 @@ -272,6 +272,11 @@ public class RouteService extends ChildS for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onRoutesRemove(routes); } + + // remove the routes from the inflight registry + for (Route route : routes) { + camelContext.getInflightRepository().removeRoute(route.getId()); + } // remove the routes from the collections camelContext.removeRouteCollection(routes); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java Thu Mar 8 16:02:04 2012 @@ -107,7 +107,7 @@ public class ThrottlingInflightRoutePoli // this works the best when this logic is executed when the exchange is done Consumer consumer = route.getConsumer(); - int size = getSize(consumer, exchange); + int size = getSize(route, exchange); boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges; if (log.isTraceEnabled()) { log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop}); @@ -125,7 +125,7 @@ public class ThrottlingInflightRoutePoli // reload size in case a race condition with too many at once being invoked // so we need to ensure that we read the most current size and start the consumer if we are already to low - size = getSize(consumer, exchange); + size = getSize(route, exchange); boolean start = size <= resumeInflightExchanges; if (log.isTraceEnabled()) { log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start}); @@ -229,12 +229,11 @@ public class ThrottlingInflightRoutePoli return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel()); } - private int getSize(Consumer consumer, Exchange exchange) { + private int getSize(Route route, Exchange exchange) { if (scope == ThrottlingScope.Context) { return exchange.getContext().getInflightRepository().size(); } else { - Endpoint endpoint = consumer.getEndpoint(); - return exchange.getContext().getInflightRepository().size(endpoint); + return exchange.getContext().getInflightRepository().size(route.getId()); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java Thu Mar 8 16:02:04 2012 @@ -42,7 +42,11 @@ public class ManagedConsumer extends Man } public Integer getInflightExchanges() { - return getContext().getInflightRepository().size(consumer.getEndpoint()); + if (getRouteId() != null) { + return getContext().getInflightRepository().size(getRouteId()); + } else { + return null; + } } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java Thu Mar 8 16:02:04 2012 @@ -90,11 +90,7 @@ public class ManagedRoute extends Manage } public Integer getInflightExchanges() { - if (route.getEndpoint() != null) { - return context.getInflightRepository().size(route.getEndpoint()); - } else { - return null; - } + return context.getInflightRepository().size(route.getId()); } public String getCamelId() { Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java?rev=1298447&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RouteInflightRepositoryProcessor.java Thu Mar 8 16:02:04 2012 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.spi.InflightRepository; +import org.apache.camel.spi.RouteContext; + +/** + * This processor tracks the current {@link RouteContext} while processing the {@link Exchange}. + * This ensures that the {@link Exchange} have details under which route its being currently processed. + */ +public class RouteInflightRepositoryProcessor extends DelegateAsyncProcessor { + + private final InflightRepository inflightRepository; + private Route route; + private String id; + + public RouteInflightRepositoryProcessor(InflightRepository inflightRepository, Processor processor) { + super(processor); + this.inflightRepository = inflightRepository; + } + + public void setRoute(Route route) { + this.route = route; + this.id = route.getId(); + } + + @Override + protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { + inflightRepository.add(exchange, id); + + boolean sync = processor.process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + try { + inflightRepository.remove(exchange, id); + } finally { + callback.done(doneSync); + } + } + }); + return sync; + } + + @Override + public String toString() { + return super.toString(); + } + +} Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java Thu Mar 8 16:02:04 2012 @@ -28,20 +28,36 @@ import org.apache.camel.Service; public interface InflightRepository extends Service { /** - * Adds the exchange to the inflight registry + * Adds the exchange to the inflight registry to the total counter * * @param exchange the exchange */ void add(Exchange exchange); /** - * Removes the exchange from the inflight registry + * Removes the exchange from the inflight registry to the total counter * * @param exchange the exchange */ void remove(Exchange exchange); /** + * Adds the exchange to the inflight registry associated to the given route + * + * @param exchange the exchange + * @param routeId the id of the route + */ + void add(Exchange exchange, String routeId); + + /** + * Removes the exchange from the inflight registry removing association to the given route + * + * @param exchange the exchange + * @param routeId the id of the route + */ + void remove(Exchange exchange, String routeId); + + /** * Current size of inflight exchanges. * <p/> * Will return 0 if there are no inflight exchanges. @@ -51,13 +67,29 @@ public interface InflightRepository exte int size(); /** - * Current size of inflight exchanges which are from the given endpoint. + * Will always return 0 due method is deprecated. + * @deprecated will be removed in a future Camel release. + */ + @Deprecated + int size(Endpoint endpoint); + + /** + * Removes the route from the in flight registry. + * <p/> + * Is used for cleaning up resources to avoid leaking. + * + * @param routeId the id of the route + */ + void removeRoute(String routeId); + + /** + * Current size of inflight exchanges which are from the given route. * <p/> * Will return 0 if there are no inflight exchanges. * - * @param endpoint the endpoint where the {@link Exchange} are from. + * @param routeId the id of the route * @return number of exchanges currently in flight. */ - int size(Endpoint endpoint); + int size(String routeId); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java?rev=1298447&r1=1298446&r2=1298447&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java Thu Mar 8 16:02:04 2012 @@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBui */ public class InflightRepositoryRouteTest extends ContextTestSupport { - public void testInflight() throws Exception { context.setInflightRepository(new MyInflightRepo()); @@ -34,8 +33,7 @@ public class InflightRepositoryRouteTest template.sendBody("direct:start", "Hello World"); assertEquals(0, context.getInflightRepository().size()); - assertEquals(0, context.getInflightRepository().size(context.getEndpoint("direct:start"))); - assertEquals(0, context.getInflightRepository().size(context.getEndpoint("mock:result"))); + assertEquals(0, context.getInflightRepository().size("foo")); } @Override @@ -43,7 +41,7 @@ public class InflightRepositoryRouteTest return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("mock:result"); + from("direct:start").routeId("foo").to("mock:result"); } }; } @@ -53,18 +51,13 @@ public class InflightRepositoryRouteTest @Override public void add(Exchange exchange) { super.add(exchange); - assertEquals(1, context.getInflightRepository().size()); - - assertEquals(1, size(context.getEndpoint("direct:start"))); - - // but 0 from this endpoint - assertEquals(0, size(context.getEndpoint("mock:result"))); } @Override - public void remove(Exchange exchange) { - super.remove(exchange); + public void add(Exchange exchange, String routeId) { + super.add(exchange, routeId); + assertEquals(1, context.getInflightRepository().size("foo")); } } }