Author: davsclaus
Date: Tue Feb 28 09:17:43 2012
New Revision: 1294533

URL: http://svn.apache.org/viewvc?rev=1294533&view=rev
Log:
CAMEL-5048: Keep reference on usage of queues in seda/vm component to avoid 
leaking memory.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1294533&r1=1294532&r2=1294533&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 Tue Feb 28 09:17:43 2012
@@ -35,7 +35,7 @@ public class SedaComponent extends Defau
     protected final int maxConcurrentConsumers = 500;
     protected int queueSize;
     protected int defaultConcurrentConsumers = 1;
-    private final Map<String, BlockingQueue<Exchange>> queues = new 
HashMap<String, BlockingQueue<Exchange>>();
+    private final Map<String, QueueReference> queues = new HashMap<String, 
QueueReference>();
     
     public void setQueueSize(int size) {
         queueSize = size;
@@ -56,8 +56,11 @@ public class SedaComponent extends Defau
     public synchronized BlockingQueue<Exchange> createQueue(String uri, 
Map<String, Object> parameters) {
         String key = getQueueKey(uri);
 
-        if (queues.containsKey(key)) {
-            return queues.get(key);
+        QueueReference ref = getQueues().get(key);
+        if (ref != null) {
+            // add the reference before returning queue
+            ref.addReference();
+            return ref.getQueue();
         }
 
         // create queue
@@ -73,10 +76,18 @@ public class SedaComponent extends Defau
             }
         }
 
-        queues.put(key, queue);
+        // create and add a new reference queue
+        ref = new QueueReference(queue);
+        ref.addReference();
+        getQueues().put(key, ref);
+
         return queue;
     }
 
+    public Map<String, QueueReference> getQueues() {
+        return queues;
+    }
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
         int consumers = getAndRemoveParameter(parameters, 
"concurrentConsumers", Integer.class, defaultConcurrentConsumers);
@@ -90,7 +101,7 @@ public class SedaComponent extends Defau
         return answer;
     }
 
-    protected String getQueueKey(String uri) {
+    public String getQueueKey(String uri) {
         if (uri.contains("?")) {
             // strip parameters
             uri = uri.substring(0, uri.indexOf('?'));
@@ -100,7 +111,63 @@ public class SedaComponent extends Defau
 
     @Override
     protected void doStop() throws Exception {
-        queues.clear();
+        getQueues().clear();
         super.doStop();
     }
+
+    /**
+     * On shutting down the endpoint
+     * 
+     * @param endpoint the endpoint
+     */
+    void onShutdownEndpoint(SedaEndpoint endpoint) {
+        // we need to remove the endpoint from the reference counter
+        String key = getQueueKey(endpoint.getEndpointUri());
+        QueueReference ref = getQueues().get(key);
+        if (ref != null) {
+            ref.removeReference();
+            if (ref.getCount() <= 0) {
+                // reference no longer needed so remove from queues
+                getQueues().remove(key);
+            }
+        }
+    }
+
+    /**
+     * Holder for queue references.
+     * <p/>
+     * This is used to keep track of the usages of the queues, so we know when 
a queue is no longer
+     * in use, and can safely be discarded.
+     */
+    public static final class QueueReference {
+        
+        private final BlockingQueue<Exchange> queue;
+        private volatile int count;
+
+        private QueueReference(BlockingQueue<Exchange> queue) {
+            this.queue = queue;
+        }
+        
+        void addReference() {
+            count++;
+        }
+        
+        void removeReference() {
+            count--;
+        }
+
+        /**
+         * Gets the reference counter
+         */
+        public int getCount() {
+            return count;
+        }
+
+        /**
+         * Gets the queue
+         */
+        public BlockingQueue<Exchange> getQueue() {
+            return queue;
+        }
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1294533&r1=1294532&r2=1294533&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 Tue Feb 28 09:17:43 2012
@@ -77,6 +77,11 @@ public class SedaEndpoint extends Defaul
         this.concurrentConsumers = concurrentConsumers;
     }
 
+    @Override
+    public SedaComponent getComponent() {
+        return (SedaComponent) super.getComponent();
+    }
+
     public Producer createProducer() throws Exception {
         return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), 
getTimeout(), isBlockWhenFull());
     }
@@ -326,4 +331,12 @@ public class SedaEndpoint extends Defaul
         }
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        // notify component we are shutting down this endpoint
+        if (getComponent() != null) {
+            getComponent().onShutdownEndpoint(this);
+        }
+        super.doShutdown();
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=1294533&r1=1294532&r2=1294533&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
 Tue Feb 28 09:17:43 2012
@@ -19,10 +19,8 @@ package org.apache.camel.component.vm;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.component.seda.SedaComponent;
 
 /**
@@ -34,32 +32,12 @@ import org.apache.camel.component.seda.S
  * @version 
  */
 public class VmComponent extends SedaComponent {
-    protected static final Map<String, BlockingQueue<Exchange>> QUEUES = new 
HashMap<String, BlockingQueue<Exchange>>();
+    protected static final Map<String, QueueReference> QUEUES = new 
HashMap<String, QueueReference>();
     private static final AtomicInteger START_COUNTER = new AtomicInteger();
 
     @Override
-    public synchronized BlockingQueue<Exchange> createQueue(String uri, 
Map<String, Object> parameters) {
-        String key = getQueueKey(uri);
-
-        if (QUEUES.containsKey(key)) {
-            return QUEUES.get(key);
-        }
-
-        // create queue
-        BlockingQueue<Exchange> queue;
-        Integer size = getAndRemoveParameter(parameters, "size", 
Integer.class);
-        if (size != null && size > 0) {
-            queue = new LinkedBlockingQueue<Exchange>(size);
-        } else {
-            if (getQueueSize() > 0) {
-                queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
-            } else {
-                queue = new LinkedBlockingQueue<Exchange>();
-            }
-        }
-
-        QUEUES.put(key, queue);
-        return queue;
+    public Map<String, QueueReference> getQueues() {
+        return QUEUES;
     }
 
     @Override
@@ -70,14 +48,9 @@ public class VmComponent extends SedaCom
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-        if (START_COUNTER.decrementAndGet() == 0) {
-            synchronized (QUEUES) {
-                for (BlockingQueue<Exchange> q : QUEUES.values()) {
-                    q.clear();
-                }
-                QUEUES.clear();
-            }
+        if (START_COUNTER.decrementAndGet() <= 0) {
+            // clear queues when no more vm components in use
+            getQueues().clear();
         }
     }
 }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java?rev=1294533&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
 Tue Feb 28 09:17:43 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.Iterator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SedaComponentReferenceEndpointTest extends ContextTestSupport {
+    
+    public void testSedaComponentReference() throws Exception {
+        SedaComponent seda = context.getComponent("seda", SedaComponent.class);
+
+        String key = seda.getQueueKey("seda://foo");
+        assertEquals(1, seda.getQueues().get(key).getCount());
+        assertEquals(2, numberOfReferences(seda));
+
+        // add a second consumer on the endpoint
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("seda:foo?blockWhenFull=true").routeId("foo2").to("mock:foo2");
+            }
+        });
+
+        assertEquals(2, seda.getQueues().get(key).getCount());
+        assertEquals(3, numberOfReferences(seda));
+
+        // remove the 1st route
+        context.stopRoute("foo");
+        context.removeRoute("foo");
+
+        assertEquals(1, seda.getQueues().get(key).getCount());
+        assertEquals(2, numberOfReferences(seda));
+
+        // remove the 2nd route
+        context.stopRoute("foo2");
+        context.removeRoute("foo2");
+
+        // and there is no longer queues for the foo key
+        assertNull(seda.getQueues().get(key));
+
+        // there should still be a bar
+        assertEquals(1, numberOfReferences(seda));
+        key = seda.getQueueKey("seda://bar");
+        assertEquals(1, seda.getQueues().get(key).getCount());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").routeId("foo").to("mock:foo");
+
+                from("seda:bar").routeId("bar").to("mock:bar");
+            }
+        };
+    }
+    
+    private int numberOfReferences(SedaComponent seda) {
+        int num = 0;
+        Iterator<SedaComponent.QueueReference> it = 
seda.getQueues().values().iterator();
+        while (it.hasNext()) {
+            num += it.next().getCount();
+        }
+        return num;
+    }
+
+}

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java?rev=1294533&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
 Tue Feb 28 09:17:43 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import java.util.Iterator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.seda.SedaComponent;
+
+/**
+ *
+ */
+public class VmComponentReferenceEndpointTest extends ContextTestSupport {
+    
+    public void testVmComponentReference() throws Exception {
+        VmComponent vm = context.getComponent("vm", VmComponent.class);
+
+        String key = vm.getQueueKey("vm://foo");
+        assertEquals(1, vm.getQueues().get(key).getCount());
+        assertEquals(2, numberOfReferences(vm));
+
+        // add a second consumer on the endpoint
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("vm:foo?blockWhenFull=true").routeId("foo2").to("mock:foo2");
+            }
+        });
+
+        assertEquals(2, vm.getQueues().get(key).getCount());
+        assertEquals(3, numberOfReferences(vm));
+
+        // remove the 1st route
+        context.stopRoute("foo");
+        context.removeRoute("foo");
+
+        assertEquals(1, vm.getQueues().get(key).getCount());
+        assertEquals(2, numberOfReferences(vm));
+
+        // remove the 2nd route
+        context.stopRoute("foo2");
+        context.removeRoute("foo2");
+
+        // and there is no longer queues for the foo key
+        assertNull(vm.getQueues().get(key));
+
+        // there should still be a bar
+        assertEquals(1, numberOfReferences(vm));
+        key = vm.getQueueKey("vm://bar");
+        assertEquals(1, vm.getQueues().get(key).getCount());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:foo").routeId("foo").to("mock:foo");
+
+                from("vm:bar").routeId("bar").to("mock:bar");
+            }
+        };
+    }
+    
+    private int numberOfReferences(VmComponent vm) {
+        int num = 0;
+        Iterator<SedaComponent.QueueReference> it = 
vm.getQueues().values().iterator();
+        while (it.hasNext()) {
+            num += it.next().getCount();
+        }
+        return num;
+    }
+
+}


Reply via email to