Author: davsclaus
Date: Tue Feb 28 09:20:52 2012
New Revision: 1294535

URL: http://svn.apache.org/viewvc?rev=1294535&view=rev
Log:
CAMEL-5048: Keep reference on usage of queues in seda/vm component to avoid 
leaking memory.

Added:
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
      - copied unchanged from r1294533, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
      - copied unchanged from r1294533, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 28 09:20:52 2012
@@ -1 +1 @@
-/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502
+/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1294535&r1=1294534&r2=1294535&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 Tue Feb 28 09:20:52 2012
@@ -35,7 +35,7 @@ public class SedaComponent extends Defau
     protected final int maxConcurrentConsumers = 500;
     protected int queueSize;
     protected int defaultConcurrentConsumers = 1;
-    private final Map<String, BlockingQueue<Exchange>> queues = new 
HashMap<String, BlockingQueue<Exchange>>();
+    private final Map<String, QueueReference> queues = new HashMap<String, 
QueueReference>();
     
     public void setQueueSize(int size) {
         queueSize = size;
@@ -56,8 +56,11 @@ public class SedaComponent extends Defau
     public synchronized BlockingQueue<Exchange> createQueue(String uri, 
Map<String, Object> parameters) {
         String key = getQueueKey(uri);
 
-        if (queues.containsKey(key)) {
-            return queues.get(key);
+        QueueReference ref = getQueues().get(key);
+        if (ref != null) {
+            // add the reference before returning queue
+            ref.addReference();
+            return ref.getQueue();
         }
 
         // create queue
@@ -73,10 +76,18 @@ public class SedaComponent extends Defau
             }
         }
 
-        queues.put(key, queue);
+        // create and add a new reference queue
+        ref = new QueueReference(queue);
+        ref.addReference();
+        getQueues().put(key, ref);
+
         return queue;
     }
 
+    public Map<String, QueueReference> getQueues() {
+        return queues;
+    }
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
         int consumers = getAndRemoveParameter(parameters, 
"concurrentConsumers", Integer.class, defaultConcurrentConsumers);
@@ -90,7 +101,7 @@ public class SedaComponent extends Defau
         return answer;
     }
 
-    protected String getQueueKey(String uri) {
+    public String getQueueKey(String uri) {
         if (uri.contains("?")) {
             // strip parameters
             uri = uri.substring(0, uri.indexOf('?'));
@@ -100,7 +111,63 @@ public class SedaComponent extends Defau
 
     @Override
     protected void doStop() throws Exception {
-        queues.clear();
+        getQueues().clear();
         super.doStop();
     }
+
+    /**
+     * On shutting down the endpoint
+     * 
+     * @param endpoint the endpoint
+     */
+    void onShutdownEndpoint(SedaEndpoint endpoint) {
+        // we need to remove the endpoint from the reference counter
+        String key = getQueueKey(endpoint.getEndpointUri());
+        QueueReference ref = getQueues().get(key);
+        if (ref != null) {
+            ref.removeReference();
+            if (ref.getCount() <= 0) {
+                // reference no longer needed so remove from queues
+                getQueues().remove(key);
+            }
+        }
+    }
+
+    /**
+     * Holder for queue references.
+     * <p/>
+     * This is used to keep track of the usages of the queues, so we know when 
a queue is no longer
+     * in use, and can safely be discarded.
+     */
+    public static final class QueueReference {
+        
+        private final BlockingQueue<Exchange> queue;
+        private volatile int count;
+
+        private QueueReference(BlockingQueue<Exchange> queue) {
+            this.queue = queue;
+        }
+        
+        void addReference() {
+            count++;
+        }
+        
+        void removeReference() {
+            count--;
+        }
+
+        /**
+         * Gets the reference counter
+         */
+        public int getCount() {
+            return count;
+        }
+
+        /**
+         * Gets the queue
+         */
+        public BlockingQueue<Exchange> getQueue() {
+            return queue;
+        }
+    }
 }

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1294535&r1=1294534&r2=1294535&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 Tue Feb 28 09:20:52 2012
@@ -77,6 +77,11 @@ public class SedaEndpoint extends Defaul
         this.concurrentConsumers = concurrentConsumers;
     }
 
+    @Override
+    public SedaComponent getComponent() {
+        return (SedaComponent) super.getComponent();
+    }
+
     public Producer createProducer() throws Exception {
         return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), 
getTimeout(), isBlockWhenFull());
     }
@@ -326,4 +331,12 @@ public class SedaEndpoint extends Defaul
         }
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        // notify component we are shutting down this endpoint
+        if (getComponent() != null) {
+            getComponent().onShutdownEndpoint(this);
+        }
+        super.doShutdown();
+    }
 }

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=1294535&r1=1294534&r2=1294535&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
 Tue Feb 28 09:20:52 2012
@@ -19,10 +19,8 @@ package org.apache.camel.component.vm;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.component.seda.SedaComponent;
 
 /**
@@ -34,32 +32,12 @@ import org.apache.camel.component.seda.S
  * @version 
  */
 public class VmComponent extends SedaComponent {
-    protected static final Map<String, BlockingQueue<Exchange>> QUEUES = new 
HashMap<String, BlockingQueue<Exchange>>();
+    protected static final Map<String, QueueReference> QUEUES = new 
HashMap<String, QueueReference>();
     private static final AtomicInteger START_COUNTER = new AtomicInteger();
 
     @Override
-    public synchronized BlockingQueue<Exchange> createQueue(String uri, 
Map<String, Object> parameters) {
-        String key = getQueueKey(uri);
-
-        if (QUEUES.containsKey(key)) {
-            return QUEUES.get(key);
-        }
-
-        // create queue
-        BlockingQueue<Exchange> queue;
-        Integer size = getAndRemoveParameter(parameters, "size", 
Integer.class);
-        if (size != null && size > 0) {
-            queue = new LinkedBlockingQueue<Exchange>(size);
-        } else {
-            if (getQueueSize() > 0) {
-                queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
-            } else {
-                queue = new LinkedBlockingQueue<Exchange>();
-            }
-        }
-
-        QUEUES.put(key, queue);
-        return queue;
+    public Map<String, QueueReference> getQueues() {
+        return QUEUES;
     }
 
     @Override
@@ -70,14 +48,9 @@ public class VmComponent extends SedaCom
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-        if (START_COUNTER.decrementAndGet() == 0) {
-            synchronized (QUEUES) {
-                for (BlockingQueue<Exchange> q : QUEUES.values()) {
-                    q.clear();
-                }
-                QUEUES.clear();
-            }
+        if (START_COUNTER.decrementAndGet() <= 0) {
+            // clear queues when no more vm components in use
+            getQueues().clear();
         }
     }
 }


Reply via email to