Author: davsclaus Date: Tue Feb 28 09:17:43 2012 New Revision: 1294533 URL: http://svn.apache.org/viewvc?rev=1294533&view=rev Log: CAMEL-5048: Keep reference on usage of queues in seda/vm component to avoid leaking memory.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1294533&r1=1294532&r2=1294533&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Tue Feb 28 09:17:43 2012 @@ -35,7 +35,7 @@ public class SedaComponent extends Defau protected final int maxConcurrentConsumers = 500; protected int queueSize; protected int defaultConcurrentConsumers = 1; - private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>(); + private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>(); public void setQueueSize(int size) { queueSize = size; @@ -56,8 +56,11 @@ public class SedaComponent extends Defau public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) { String key = getQueueKey(uri); - if (queues.containsKey(key)) { - return queues.get(key); + QueueReference ref = getQueues().get(key); + if (ref != null) { + // add the reference before returning queue + ref.addReference(); + return ref.getQueue(); } // create queue @@ -73,10 +76,18 @@ public class SedaComponent extends Defau } } - queues.put(key, queue); + // create and add a new reference queue + ref = new QueueReference(queue); + ref.addReference(); + getQueues().put(key, ref); + return queue; } + public Map<String, QueueReference> getQueues() { + return queues; + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); @@ -90,7 +101,7 @@ public class SedaComponent extends Defau return answer; } - protected String getQueueKey(String uri) { + public String getQueueKey(String uri) { if (uri.contains("?")) { // strip parameters uri = uri.substring(0, uri.indexOf('?')); @@ -100,7 +111,63 @@ public class SedaComponent extends Defau @Override protected void doStop() throws Exception { - queues.clear(); + getQueues().clear(); super.doStop(); } + + /** + * On shutting down the endpoint + * + * @param endpoint the endpoint + */ + void onShutdownEndpoint(SedaEndpoint endpoint) { + // we need to remove the endpoint from the reference counter + String key = getQueueKey(endpoint.getEndpointUri()); + QueueReference ref = getQueues().get(key); + if (ref != null) { + ref.removeReference(); + if (ref.getCount() <= 0) { + // reference no longer needed so remove from queues + getQueues().remove(key); + } + } + } + + /** + * Holder for queue references. + * <p/> + * This is used to keep track of the usages of the queues, so we know when a queue is no longer + * in use, and can safely be discarded. + */ + public static final class QueueReference { + + private final BlockingQueue<Exchange> queue; + private volatile int count; + + private QueueReference(BlockingQueue<Exchange> queue) { + this.queue = queue; + } + + void addReference() { + count++; + } + + void removeReference() { + count--; + } + + /** + * Gets the reference counter + */ + public int getCount() { + return count; + } + + /** + * Gets the queue + */ + public BlockingQueue<Exchange> getQueue() { + return queue; + } + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1294533&r1=1294532&r2=1294533&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Tue Feb 28 09:17:43 2012 @@ -77,6 +77,11 @@ public class SedaEndpoint extends Defaul this.concurrentConsumers = concurrentConsumers; } + @Override + public SedaComponent getComponent() { + return (SedaComponent) super.getComponent(); + } + public Producer createProducer() throws Exception { return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull()); } @@ -326,4 +331,12 @@ public class SedaEndpoint extends Defaul } } + @Override + protected void doShutdown() throws Exception { + // notify component we are shutting down this endpoint + if (getComponent() != null) { + getComponent().onShutdownEndpoint(this); + } + super.doShutdown(); + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=1294533&r1=1294532&r2=1294533&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Tue Feb 28 09:17:43 2012 @@ -19,10 +19,8 @@ package org.apache.camel.component.vm; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.camel.Exchange; import org.apache.camel.component.seda.SedaComponent; /** @@ -34,32 +32,12 @@ import org.apache.camel.component.seda.S * @version */ public class VmComponent extends SedaComponent { - protected static final Map<String, BlockingQueue<Exchange>> QUEUES = new HashMap<String, BlockingQueue<Exchange>>(); + protected static final Map<String, QueueReference> QUEUES = new HashMap<String, QueueReference>(); private static final AtomicInteger START_COUNTER = new AtomicInteger(); @Override - public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) { - String key = getQueueKey(uri); - - if (QUEUES.containsKey(key)) { - return QUEUES.get(key); - } - - // create queue - BlockingQueue<Exchange> queue; - Integer size = getAndRemoveParameter(parameters, "size", Integer.class); - if (size != null && size > 0) { - queue = new LinkedBlockingQueue<Exchange>(size); - } else { - if (getQueueSize() > 0) { - queue = new LinkedBlockingQueue<Exchange>(getQueueSize()); - } else { - queue = new LinkedBlockingQueue<Exchange>(); - } - } - - QUEUES.put(key, queue); - return queue; + public Map<String, QueueReference> getQueues() { + return QUEUES; } @Override @@ -70,14 +48,9 @@ public class VmComponent extends SedaCom @Override protected void doStop() throws Exception { - super.doStop(); - if (START_COUNTER.decrementAndGet() == 0) { - synchronized (QUEUES) { - for (BlockingQueue<Exchange> q : QUEUES.values()) { - q.clear(); - } - QUEUES.clear(); - } + if (START_COUNTER.decrementAndGet() <= 0) { + // clear queues when no more vm components in use + getQueues().clear(); } } } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java?rev=1294533&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java Tue Feb 28 09:17:43 2012 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.Iterator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class SedaComponentReferenceEndpointTest extends ContextTestSupport { + + public void testSedaComponentReference() throws Exception { + SedaComponent seda = context.getComponent("seda", SedaComponent.class); + + String key = seda.getQueueKey("seda://foo"); + assertEquals(1, seda.getQueues().get(key).getCount()); + assertEquals(2, numberOfReferences(seda)); + + // add a second consumer on the endpoint + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?blockWhenFull=true").routeId("foo2").to("mock:foo2"); + } + }); + + assertEquals(2, seda.getQueues().get(key).getCount()); + assertEquals(3, numberOfReferences(seda)); + + // remove the 1st route + context.stopRoute("foo"); + context.removeRoute("foo"); + + assertEquals(1, seda.getQueues().get(key).getCount()); + assertEquals(2, numberOfReferences(seda)); + + // remove the 2nd route + context.stopRoute("foo2"); + context.removeRoute("foo2"); + + // and there is no longer queues for the foo key + assertNull(seda.getQueues().get(key)); + + // there should still be a bar + assertEquals(1, numberOfReferences(seda)); + key = seda.getQueueKey("seda://bar"); + assertEquals(1, seda.getQueues().get(key).getCount()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("foo").to("mock:foo"); + + from("seda:bar").routeId("bar").to("mock:bar"); + } + }; + } + + private int numberOfReferences(SedaComponent seda) { + int num = 0; + Iterator<SedaComponent.QueueReference> it = seda.getQueues().values().iterator(); + while (it.hasNext()) { + num += it.next().getCount(); + } + return num; + } + +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java?rev=1294533&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java Tue Feb 28 09:17:43 2012 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vm; + +import java.util.Iterator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.seda.SedaComponent; + +/** + * + */ +public class VmComponentReferenceEndpointTest extends ContextTestSupport { + + public void testVmComponentReference() throws Exception { + VmComponent vm = context.getComponent("vm", VmComponent.class); + + String key = vm.getQueueKey("vm://foo"); + assertEquals(1, vm.getQueues().get(key).getCount()); + assertEquals(2, numberOfReferences(vm)); + + // add a second consumer on the endpoint + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("vm:foo?blockWhenFull=true").routeId("foo2").to("mock:foo2"); + } + }); + + assertEquals(2, vm.getQueues().get(key).getCount()); + assertEquals(3, numberOfReferences(vm)); + + // remove the 1st route + context.stopRoute("foo"); + context.removeRoute("foo"); + + assertEquals(1, vm.getQueues().get(key).getCount()); + assertEquals(2, numberOfReferences(vm)); + + // remove the 2nd route + context.stopRoute("foo2"); + context.removeRoute("foo2"); + + // and there is no longer queues for the foo key + assertNull(vm.getQueues().get(key)); + + // there should still be a bar + assertEquals(1, numberOfReferences(vm)); + key = vm.getQueueKey("vm://bar"); + assertEquals(1, vm.getQueues().get(key).getCount()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("vm:foo").routeId("foo").to("mock:foo"); + + from("vm:bar").routeId("bar").to("mock:bar"); + } + }; + } + + private int numberOfReferences(VmComponent vm) { + int num = 0; + Iterator<SedaComponent.QueueReference> it = vm.getQueues().values().iterator(); + while (it.hasNext()) { + num += it.next().getCount(); + } + return num; + } + +}