Author: davsclaus
Date: Sat Feb 20 11:56:20 2010
New Revision: 912103

URL: http://svn.apache.org/viewvc?rev=912103&view=rev
Log:
CAMEL-2484: RecipientList now acquires and releases producers from 
ProducerCache. Also fixed memory issue when using Mina or FTP producers with 
recipientList. Fixed LRUCache to stop its cached services when its stopped to 
better release resources when Camel is stopped.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListLeakTest.java
      - copied, changed from r911857, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java   
(with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=912103&r1=912102&r2=912103&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
Sat Feb 20 11:56:20 2010
@@ -63,13 +63,32 @@
         this.producers = cache;
     }
 
-    public Producer getProducer(Endpoint endpoint) {
-        // As the producer is returned outside this method we do not want to 
return pooled producers
-        // so we pass in false to the method. if we returned pooled producers 
then the user had
-        // to remember to return it back in the pool.
-        // See method doInProducer that is safe template pattern where we 
handle the lifecycle and
-        // thus safely can use pooled producers there
-        return doGetProducer(endpoint, false);
+    /**
+     * Acquires a pooled producer which you <b>must</b> release back again 
after usage using the
+     * {...@link #releaseProducer(org.apache.camel.Endpoint, 
org.apache.camel.Producer)} method.
+     *
+     * @param endpoint the endpoint
+     * @return the producer
+     */
+    public Producer acquireProducer(Endpoint endpoint) {
+        return doGetProducer(endpoint, true);
+    }
+
+    /**
+     * Releases an acquired producer back after usage.
+     *
+     * @param endpoint the endpoint
+     * @param producer the producer to release
+     * @throws Exception can be thrown if error stopping producer if that was 
needed.
+     */
+    public void releaseProducer(Endpoint endpoint, Producer producer) throws 
Exception {
+        if (producer instanceof ServicePoolAware) {
+            // release back to the pool
+            pool.release(endpoint, producer);
+        } else if (!producer.isSingleton()) {
+            // stop non singleton producers as we should not leak resources
+            producer.stop();
+        }
     }
 
     /**
@@ -215,8 +234,8 @@
             // create a new producer
             try {
                 answer = endpoint.createProducer();
-                // add it as service to camel context so it can be managed as 
well
-                context.addService(answer);
+                // must then start service so producer is ready to be used
+                ServiceHelper.startService(answer);
             } catch (Exception e) {
                 throw new FailedToCreateProducerException(endpoint, e);
             }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=912103&r1=912102&r2=912103&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 Sat Feb 20 11:56:20 2010
@@ -18,7 +18,9 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Endpoint;
@@ -87,19 +89,32 @@
     public void sendToRecipientList(Exchange exchange, Object receipientList) 
throws Exception {
         Iterator<Object> iter = ObjectHelper.createIterator(receipientList, 
delimiter);
 
-        List<Processor> processors = new ArrayList<Processor>();
-        while (iter.hasNext()) {
-            Object recipient = iter.next();
-            Endpoint endpoint = resolveEndpoint(exchange, recipient);
-            Producer producer = 
getProducerCache(exchange).getProducer(endpoint);
-            processors.add(producer);
+        // we should acquire and release the producers we need so we can 
leverage the producer
+        // cache to the fullest
+        ProducerCache cache = getProducerCache(exchange);
+        Map<Endpoint, Producer> producers = new LinkedHashMap<Endpoint, 
Producer>();
+        try {
+            List<Processor> processors = new ArrayList<Processor>();
+            while (iter.hasNext()) {
+                Object recipient = iter.next();
+                Endpoint endpoint = resolveEndpoint(exchange, recipient);
+                // acquire producer which we then release later
+                Producer producer = cache.acquireProducer(endpoint);
+                processors.add(producer);
+                producers.put(endpoint, producer);
+            }
+
+            MulticastProcessor mp = new MulticastProcessor(processors, 
getAggregationStrategy(), isParallelProcessing(),
+                                                           
getExecutorService(), false, isStopOnException());
+
+            // now let the multicast process the exchange
+            mp.process(exchange);
+        } finally {
+            // and release the producers back to the producer cache
+            for (Map.Entry<Endpoint, Producer> entry : producers.entrySet()) {
+                cache.releaseProducer(entry.getKey(), entry.getValue());
+            }
         }
-
-        MulticastProcessor mp = new MulticastProcessor(processors, 
getAggregationStrategy(), isParallelProcessing(),
-                                                       getExecutorService(), 
false, isStopOnException());
-
-        // now let the multicast process the exchange
-        mp.process(exchange);
     }
 
     protected ProducerCache getProducerCache(Exchange exchange) throws 
Exception {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java?rev=912103&r1=912102&r2=912103&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/LRUCache.java 
Sat Feb 20 11:56:20 2010
@@ -19,12 +19,14 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.camel.Service;
+
 /**
  * A Least Recently Used Cache
  *
  * @version $Revision$
  */
-public class LRUCache<K, V> extends LinkedHashMap<K, V> {
+public class LRUCache<K, V> extends LinkedHashMap<K, V> implements Service {
     private static final long serialVersionUID = -342098639681884413L;
     private int maxCacheSize = 10000;
 
@@ -59,4 +61,16 @@
     protected boolean removeEldestEntry(Map.Entry<K, V> entry) {
         return size() > maxCacheSize;
     }
+
+    public void start() throws Exception {
+        // noop
+    }
+
+    public void stop() throws Exception {
+        // stop the value and clear the cache
+        if (!isEmpty()) {
+            ServiceHelper.stopServices(values());
+            clear();
+        }
+    }
 }
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java?rev=912103&r1=912102&r2=912103&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
 Sat Feb 20 11:56:20 2010
@@ -41,4 +41,21 @@
         cache.stop();
     }
 
+    public void testCacheProducerAcquireAndRelease() throws Exception {
+        ProducerCache cache = new ProducerCache(context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // test that we cache at most 1000 producers to avoid it eating to 
much memory
+        for (int i = 0; i < 1003; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            Producer p = cache.acquireProducer(e);
+            cache.releaseProducer(e, p);
+        }
+
+        assertEquals("Size should be 1000", 1000, cache.size());
+        cache.stop();
+    }
+
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java?rev=912103&r1=912102&r2=912103&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
 Sat Feb 20 11:56:20 2010
@@ -73,7 +73,7 @@
         assertEquals("Could not find 1 consumers: " + s, 1, s.size());
 
         s = mbsc.queryNames(new ObjectName(domainName + ":type=producers,*"), 
null);
-        assertEquals("Could not find 2 producers: " + s, 2, s.size());
+        assertEquals("Could not find 2 producers: " + s, 0, s.size());
 
         s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
         assertEquals("Could not find 1 route: " + s, 1, s.size());

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java?rev=912103&r1=912102&r2=912103&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java
 Sat Feb 20 11:56:20 2010
@@ -55,27 +55,29 @@
         producer.process(exchange);
         producer.stop();
 
-        MBeanServer mbeanServer = 
context.getManagementStrategy().getManagementAgent().getMBeanServer();
+        // TODO: producers are not managed due they can lead to memory leak 
CAMEL-2484
 
-        Set<ObjectName> set = mbeanServer.queryNames(new 
ObjectName("*:type=producers,*"), null);
-        assertEquals(1, set.size());
+//        MBeanServer mbeanServer = 
context.getManagementStrategy().getManagementAgent().getMBeanServer();
 
-        ObjectName on = set.iterator().next();
+//        Set<ObjectName> set = mbeanServer.queryNames(new 
ObjectName("*:type=producers,*"), null);
+//        assertEquals(0, set.size());
 
-        assertTrue("Should be registered", mbeanServer.isRegistered(on));
-        String uri = (String) mbeanServer.getAttribute(on, "EndpointUri");
-        assertEquals("mock://result", uri);
+//        ObjectName on = set.iterator().next();
+
+//        assertTrue("Should be registered", mbeanServer.isRegistered(on));
+//        String uri = (String) mbeanServer.getAttribute(on, "EndpointUri");
+//        assertEquals("mock://result", uri);
 
         // TODO: populating route id on producers is not implemented yet
 //        String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
 //        assertEquals("route1", routeId);
 
-        Boolean singleton = (Boolean) mbeanServer.getAttribute(on, 
"Singleton");
-        assertEquals(Boolean.TRUE, singleton);
+//        Boolean singleton = (Boolean) mbeanServer.getAttribute(on, 
"Singleton");
+//        assertEquals(Boolean.TRUE, singleton);
 
         context.stop();
 
-        assertFalse("Should no longer be registered", 
mbeanServer.isRegistered(on));
+//        assertFalse("Should no longer be registered", 
mbeanServer.isRegistered(on));
     }
 
     @Override

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListLeakTest.java
 (from r911857, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListLeakTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListLeakTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListTest.java&r1=911857&r2=912103&rev=912103&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListLeakTest.java
 Sat Feb 20 11:56:20 2010
@@ -23,37 +23,33 @@
 /**
  * @version $Revision$
  */
-public class RecipientListTest extends ContextTestSupport {
+public class RecipientListLeakTest extends ContextTestSupport {
 
-    public void testSendingAMessageUsingMulticastReceivesItsOwnExchange() 
throws Exception {
+    public void testRecipientListLeak() throws Exception {
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
 
-        x.expectedBodiesReceived("answer");
-        y.expectedBodiesReceived("answer");
-        z.expectedBodiesReceived("answer");
+        x.expectedBodiesReceived("Hello World", "Bye World");
+        y.expectedBodiesReceived("Hello World", "Bye World");
+        z.expectedBodiesReceived("Hello World", "Bye World");
 
-        sendBody();
+        sendBody("Hello World");
+        sendBody("Bye World");
 
         assertMockEndpointsSatisfied();
     }
 
-    protected void sendBody() {
-        template.sendBodyAndHeader("direct:a", "answer", "recipientListHeader",
-                "mock:x,mock:y,mock:z");
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader", 
"mock:x,mock:y,mock:z");
     }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                // START SNIPPET: example
-                from("direct:a").recipientList(
-                        header("recipientListHeader").tokenize(","));
-                // END SNIPPET: example
+                from("direct:a").recipientList(header("recipientListHeader"));
             }
         };
-
     }
 
-}
+}
\ No newline at end of file

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java?rev=912103&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java 
(added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java 
Sat Feb 20 11:56:20 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import junit.framework.TestCase;
+import org.apache.camel.Service;
+
+/**
+ * @version $Revision$
+ */
+public class LRUCacheTest extends TestCase {
+
+    private LRUCache<String, Service> cache;
+
+    @Override
+    protected void setUp() throws Exception {
+        cache = new LRUCache<String, Service>(10);
+    }
+
+    public void testLRUCache() {
+        MyService service1 = new MyService();
+        MyService service2 = new MyService();
+
+        cache.put("A", service1);
+        cache.put("B", service2);
+
+        assertEquals(2, cache.size());
+
+        assertSame(service1, cache.get("A"));
+        assertSame(service2, cache.get("B"));
+    }
+
+    public void testLRUCacheStop() throws Exception {
+        MyService service1 = new MyService();
+        MyService service2 = new MyService();
+
+        cache.put("A", service1);
+        cache.put("B", service2);
+
+        assertEquals(false, service1.isStopped());
+        assertEquals(false, service2.isStopped());
+
+        cache.stop();
+
+        assertEquals(0, cache.size());
+
+        assertEquals(true, service1.isStopped());
+        assertEquals(true, service2.isStopped());
+    }
+
+    private final class MyService implements Service {
+
+        private boolean stopped;
+
+        public void start() throws Exception {
+        }
+
+        public void stop() throws Exception {
+            stopped = true;
+        }
+
+        public boolean isStopped() {
+            return stopped;
+        }
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/LRUCacheTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to