Author: davsclaus
Date: Wed Mar  7 22:09:02 2012
New Revision: 1298155

URL: http://svn.apache.org/viewvc?rev=1298155&view=rev
Log:
CAMEL-5058: Fix potential leak in inflight registry if sending to many unique 
endpoints, not from a routes. Thanks to Zach Calvert for the patch.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
      - copied, changed from r1298095, 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298155&r1=1298154&r2=1298155&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 Wed Mar  7 22:09:02 2012
@@ -16,8 +16,8 @@
  */
 package org.apache.camel.impl;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
@@ -37,7 +37,7 @@ public class DefaultInflightRepository e
     private static final transient Logger LOG = 
LoggerFactory.getLogger(DefaultInflightRepository.class);
     private final AtomicInteger totalCount = new AtomicInteger();
     // use endpoint key as key so endpoints with lenient properties is 
registered using the same key (eg dynamic http endpoints)
-    private final ConcurrentMap<String, AtomicInteger> endpointCount = new 
ConcurrentHashMap<String, AtomicInteger>();
+    private final Map<String, AtomicInteger> endpointCount = new 
HashMap<String, AtomicInteger>();
 
     public void add(Exchange exchange) {
         int count = totalCount.incrementAndGet();
@@ -48,9 +48,14 @@ public class DefaultInflightRepository e
         }
 
         String key = exchange.getFromEndpoint().getEndpointKey();
-        AtomicInteger existing = endpointCount.putIfAbsent(key, new 
AtomicInteger(1));
-        if (existing != null) {
-            existing.addAndGet(1);
+        // need to be synchronized as we can concurrently add/remove
+        synchronized (endpointCount) {
+            AtomicInteger existing = endpointCount.get(key);
+            if (existing != null) {
+                existing.incrementAndGet();
+            } else {
+                endpointCount.put(key, new AtomicInteger(1));
+            }
         }
     }
 
@@ -63,12 +68,24 @@ public class DefaultInflightRepository e
         }
 
         String key = exchange.getFromEndpoint().getEndpointKey();
-        AtomicInteger existing = endpointCount.get(key);
-        if (existing != null) {
-            existing.addAndGet(-1);
+        // need to be synchronized as we can concurrently add/remove
+        synchronized (endpointCount) {
+            AtomicInteger existing = endpointCount.get(key);
+            if (existing != null) {
+                if (existing.decrementAndGet() <= 0) {
+                    endpointCount.remove(key);
+                }
+            }
         }
     }
 
+    /**
+     * Internal only - Used for testing purpose.
+     */
+    int endpointSize() {
+        return endpointCount.size();
+    }
+
     public int size() {
         return totalCount.get();
     }
@@ -90,6 +107,8 @@ public class DefaultInflightRepository e
         } else {
             LOG.debug("Shutting down with no inflight exchanges.");
         }
-        endpointCount.clear();
+        synchronized (endpointCount) {
+            endpointCount.clear();
+        }
     }
 }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
 (from r1298095, 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java&r1=1298095&r2=1298155&rev=1298155&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
 Wed Mar  7 22:09:02 2012
@@ -16,32 +16,126 @@
  */
 package org.apache.camel.impl;
 
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 
 /**
- * @version 
+ * @version
  */
-public class DefaultInflightRepositoryTest extends ContextTestSupport {
-
-    public void testDefaultInflightRepository() throws Exception {
-        InflightRepository repo = new DefaultInflightRepository();
-
-        assertEquals(0, repo.size());
+public class DefaultInflightRepositoryConcurrentTest extends 
ContextTestSupport {
 
-        Exchange e1 = new DefaultExchange(context);
-        repo.add(e1);
-        assertEquals(1, repo.size());
+    public static final int THREAD_COUNT = 20;
+    public static final int TOTAL_ENDPOINTS = 10000;
+    public static final int LOOP_COUNT = 100000;
+
+    private static boolean failure;
+    private static CamelContext context = new DefaultCamelContext();
+
+    public void testThreaded() throws Exception {
+        long started = System.currentTimeMillis();
+
+        DefaultInflightRepository toTest = new DefaultInflightRepository();
+        Endpoint[] eps = new Endpoint[TOTAL_ENDPOINTS];
+
+        for (int i = 0; i < eps.length; i++) {
+            // create TOTAL_ENDPOINTS endpoints
+            Endpoint endpoint = new DefaultEndpoint() {
+                final String uri = "def:" + System.nanoTime();
+                @Override
+                public String getEndpointUri() {
+                    return uri;
+                }
+
+                public boolean isSingleton() {
+                    return false;
+                }
+
+                public Producer createProducer() throws Exception {
+                    return null;
+                }
+
+                public Consumer createConsumer(Processor processor) throws 
Exception {
+                    return null;
+                }
+            };
+            eps[i] = endpoint;
+        }
+
+        AtomicInteger locker = new AtomicInteger(0);
+
+        Thread[] ts = new Thread[THREAD_COUNT];
+        for (int i = 0; i < ts.length; i++) {
+            TypicalConsumer consumer = new TypicalConsumer();
+            consumer.eps = eps;
+            consumer.repo = toTest;
+            consumer.locker = locker;
+            ts[i] = new Thread(consumer);
+        }
+
+        for (int i = 0; i < ts.length; i++) {
+            ts[i].start();
+        }
+        Thread.sleep(1000);
+        while (locker.get() > 0) {
+            synchronized (locker) {
+                locker.wait();
+            }
+        }
+
+        if (failure) {
+            throw new Exception("Failed to properly track endpoints");
+        }
+
+        for (Endpoint ep : eps) {
+            Assert.assertTrue("Size MUST be 0", 0 == toTest.size(ep));
+        }
+
+        if (toTest.size() > 0) {
+            throw new Exception("Test either incomplete or tracking failed");
+        }
 
-        Exchange e2 = new DefaultExchange(context);
-        repo.add(e2);
-        assertEquals(2, repo.size());
-
-        repo.remove(e2);
-        assertEquals(1, repo.size());
+        Assert.assertTrue("Must not have any references left", 0 == 
toTest.endpointSize());
+    }
 
-        repo.remove(e1);
-        assertEquals(0, repo.size());
+    private static class TypicalConsumer implements Runnable {
+        Endpoint[] eps;
+        DefaultInflightRepository repo;
+        Random rand = new Random(System.nanoTime());
+        AtomicInteger locker;
+
+        public void run() {
+            synchronized (locker) {
+                locker.incrementAndGet();
+            }
+            try {
+                for (int i = 0; i < LOOP_COUNT; i++) {
+                    Endpoint ep = eps[Math.abs(rand.nextInt() % eps.length)];
+                    ep.setCamelContext(context);
+                    Exchange ex = new DefaultExchange(ep);
+                    repo.add(ex);
+                    int size = repo.size(ep);
+                    if (size <= 0) {
+                        failure = true;
+                    }
+                    repo.remove(ex);
+                }
+            } catch (Exception e) {
+                failure = true;
+            }
+
+            synchronized (locker) {
+                locker.decrementAndGet();
+                locker.notifyAll();
+            }
+        }
     }
 }


Reply via email to