Author: davsclaus Date: Tue Feb 28 09:20:52 2012 New Revision: 1294535 URL: http://svn.apache.org/viewvc?rev=1294535&view=rev Log: CAMEL-5048: Keep reference on usage of queues in seda/vm component to avoid leaking memory.
Added: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java - copied unchanged from r1294533, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java - copied unchanged from r1294533, camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Feb 28 09:20:52 2012 @@ -1 +1 @@ -/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502 +/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1294535&r1=1294534&r2=1294535&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Tue Feb 28 09:20:52 2012 @@ -35,7 +35,7 @@ public class SedaComponent extends Defau protected final int maxConcurrentConsumers = 500; protected int queueSize; protected int defaultConcurrentConsumers = 1; - private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>(); + private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>(); public void setQueueSize(int size) { queueSize = size; @@ -56,8 +56,11 @@ public class SedaComponent extends Defau public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) { String key = getQueueKey(uri); - if (queues.containsKey(key)) { - return queues.get(key); + QueueReference ref = getQueues().get(key); + if (ref != null) { + // add the reference before returning queue + ref.addReference(); + return ref.getQueue(); } // create queue @@ -73,10 +76,18 @@ public class SedaComponent extends Defau } } - queues.put(key, queue); + // create and add a new reference queue + ref = new QueueReference(queue); + ref.addReference(); + getQueues().put(key, ref); + return queue; } + public Map<String, QueueReference> getQueues() { + return queues; + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); @@ -90,7 +101,7 @@ public class SedaComponent extends Defau return answer; } - protected String getQueueKey(String uri) { + public String getQueueKey(String uri) { if (uri.contains("?")) { // strip parameters uri = uri.substring(0, uri.indexOf('?')); @@ -100,7 +111,63 @@ public class SedaComponent extends Defau @Override protected void doStop() throws Exception { - queues.clear(); + getQueues().clear(); super.doStop(); } + + /** + * On shutting down the endpoint + * + * @param endpoint the endpoint + */ + void onShutdownEndpoint(SedaEndpoint endpoint) { + // we need to remove the endpoint from the reference counter + String key = getQueueKey(endpoint.getEndpointUri()); + QueueReference ref = getQueues().get(key); + if (ref != null) { + ref.removeReference(); + if (ref.getCount() <= 0) { + // reference no longer needed so remove from queues + getQueues().remove(key); + } + } + } + + /** + * Holder for queue references. + * <p/> + * This is used to keep track of the usages of the queues, so we know when a queue is no longer + * in use, and can safely be discarded. + */ + public static final class QueueReference { + + private final BlockingQueue<Exchange> queue; + private volatile int count; + + private QueueReference(BlockingQueue<Exchange> queue) { + this.queue = queue; + } + + void addReference() { + count++; + } + + void removeReference() { + count--; + } + + /** + * Gets the reference counter + */ + public int getCount() { + return count; + } + + /** + * Gets the queue + */ + public BlockingQueue<Exchange> getQueue() { + return queue; + } + } } Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1294535&r1=1294534&r2=1294535&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Tue Feb 28 09:20:52 2012 @@ -77,6 +77,11 @@ public class SedaEndpoint extends Defaul this.concurrentConsumers = concurrentConsumers; } + @Override + public SedaComponent getComponent() { + return (SedaComponent) super.getComponent(); + } + public Producer createProducer() throws Exception { return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull()); } @@ -326,4 +331,12 @@ public class SedaEndpoint extends Defaul } } + @Override + protected void doShutdown() throws Exception { + // notify component we are shutting down this endpoint + if (getComponent() != null) { + getComponent().onShutdownEndpoint(this); + } + super.doShutdown(); + } } Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=1294535&r1=1294534&r2=1294535&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Tue Feb 28 09:20:52 2012 @@ -19,10 +19,8 @@ package org.apache.camel.component.vm; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.camel.Exchange; import org.apache.camel.component.seda.SedaComponent; /** @@ -34,32 +32,12 @@ import org.apache.camel.component.seda.S * @version */ public class VmComponent extends SedaComponent { - protected static final Map<String, BlockingQueue<Exchange>> QUEUES = new HashMap<String, BlockingQueue<Exchange>>(); + protected static final Map<String, QueueReference> QUEUES = new HashMap<String, QueueReference>(); private static final AtomicInteger START_COUNTER = new AtomicInteger(); @Override - public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) { - String key = getQueueKey(uri); - - if (QUEUES.containsKey(key)) { - return QUEUES.get(key); - } - - // create queue - BlockingQueue<Exchange> queue; - Integer size = getAndRemoveParameter(parameters, "size", Integer.class); - if (size != null && size > 0) { - queue = new LinkedBlockingQueue<Exchange>(size); - } else { - if (getQueueSize() > 0) { - queue = new LinkedBlockingQueue<Exchange>(getQueueSize()); - } else { - queue = new LinkedBlockingQueue<Exchange>(); - } - } - - QUEUES.put(key, queue); - return queue; + public Map<String, QueueReference> getQueues() { + return QUEUES; } @Override @@ -70,14 +48,9 @@ public class VmComponent extends SedaCom @Override protected void doStop() throws Exception { - super.doStop(); - if (START_COUNTER.decrementAndGet() == 0) { - synchronized (QUEUES) { - for (BlockingQueue<Exchange> q : QUEUES.values()) { - q.clear(); - } - QUEUES.clear(); - } + if (START_COUNTER.decrementAndGet() <= 0) { + // clear queues when no more vm components in use + getQueues().clear(); } } }