Author: davsclaus Date: Sat Mar 3 16:02:50 2012 New Revision: 1296653 URL: http://svn.apache.org/viewvc?rev=1296653&view=rev Log: CAMEL-4058: Fix leak in inflight registry when adding/removing a lot of routes at runtime.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRemoveRouteTest.java - copied, changed from r1296580, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1296653&r1=1296652&r2=1296653&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Sat Mar 3 16:02:50 2012 @@ -77,6 +77,15 @@ public class DefaultInflightRepository e return answer != null ? answer.get() : 0; } + public void removeEndpoint(Endpoint endpoint) { + // remove endpoint if there is no current inflight + String key = endpoint.getEndpointKey(); + AtomicInteger existing = endpointCount.get(key); + if (existing != null && existing.get() <= 0) { + endpointCount.remove(key); + } + } + @Override protected void doStart() throws Exception { } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1296653&r1=1296652&r2=1296653&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Sat Mar 3 16:02:50 2012 @@ -272,6 +272,11 @@ public class RouteService extends ChildS for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onRoutesRemove(routes); } + + // remove the routes from the inflight registry + for (Route route : routes) { + camelContext.getInflightRepository().removeEndpoint(route.getEndpoint()); + } // remove the routes from the collections camelContext.removeRouteCollection(routes); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=1296653&r1=1296652&r2=1296653&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java Sat Mar 3 16:02:50 2012 @@ -60,4 +60,13 @@ public interface InflightRepository exte */ int size(Endpoint endpoint); + /** + * Remove a endpoint from this registry. + * <p/> + * This is used to cleanup resources that are no longer needed. + * + * @param endpoint the endpoint to remove + */ + void removeEndpoint(Endpoint endpoint); + } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRemoveRouteTest.java (from r1296580, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRemoveRouteTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRemoveRouteTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java&r1=1296580&r2=1296653&rev=1296653&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRemoveRouteTest.java Sat Mar 3 16:02:50 2012 @@ -17,25 +17,29 @@ package org.apache.camel.impl; import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; +import org.apache.camel.Endpoint; import org.apache.camel.builder.RouteBuilder; /** * @version */ -public class InflightRepositoryRouteTest extends ContextTestSupport { - +public class InflightRepositoryRemoveRouteTest extends ContextTestSupport { + + private MyInflightRepo repo = new MyInflightRepo(); public void testInflight() throws Exception { - context.setInflightRepository(new MyInflightRepo()); - + context.setInflightRepository(repo); assertEquals(0, context.getInflightRepository().size()); + getMockEndpoint("mock:result").expectedMessageCount(1); template.sendBody("direct:start", "Hello World"); + assertMockEndpointsSatisfied(); + + context.stopRoute("foo"); + assertTrue("Should have removed route", context.removeRoute("foo")); assertEquals(0, context.getInflightRepository().size()); - assertEquals(0, context.getInflightRepository().size(context.getEndpoint("direct:start"))); - assertEquals(0, context.getInflightRepository().size(context.getEndpoint("mock:result"))); + assertTrue("Should have removed remote from inflight registry", repo.isRemoved()); } @Override @@ -43,28 +47,23 @@ public class InflightRepositoryRouteTest return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("mock:result"); + from("direct:start").routeId("foo").to("mock:result"); } }; } private class MyInflightRepo extends DefaultInflightRepository { - @Override - public void add(Exchange exchange) { - super.add(exchange); - - assertEquals(1, context.getInflightRepository().size()); + private volatile boolean removed; - assertEquals(1, size(context.getEndpoint("direct:start"))); - - // but 0 from this endpoint - assertEquals(0, size(context.getEndpoint("mock:result"))); + @Override + public void removeEndpoint(Endpoint endpoint) { + super.removeEndpoint(endpoint); + removed = true; } - @Override - public void remove(Exchange exchange) { - super.remove(exchange); + public boolean isRemoved() { + return removed; } } }