Author: davsclaus Date: Wed Mar 7 22:09:02 2012 New Revision: 1298155 URL: http://svn.apache.org/viewvc?rev=1298155&view=rev Log: CAMEL-5058: Fix potential leak in inflight registry if sending to many unique endpoints, not from a routes. Thanks to Zach Calvert for the patch.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java - copied, changed from r1298095, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298155&r1=1298154&r2=1298155&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Wed Mar 7 22:09:02 2012 @@ -16,8 +16,8 @@ */ package org.apache.camel.impl; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; @@ -37,7 +37,7 @@ public class DefaultInflightRepository e private static final transient Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); private final AtomicInteger totalCount = new AtomicInteger(); // use endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints) - private final ConcurrentMap<String, AtomicInteger> endpointCount = new ConcurrentHashMap<String, AtomicInteger>(); + private final Map<String, AtomicInteger> endpointCount = new HashMap<String, AtomicInteger>(); public void add(Exchange exchange) { int count = totalCount.incrementAndGet(); @@ -48,9 +48,14 @@ public class DefaultInflightRepository e } String key = exchange.getFromEndpoint().getEndpointKey(); - AtomicInteger existing = endpointCount.putIfAbsent(key, new AtomicInteger(1)); - if (existing != null) { - existing.addAndGet(1); + // need to be synchronized as we can concurrently add/remove + synchronized (endpointCount) { + AtomicInteger existing = endpointCount.get(key); + if (existing != null) { + existing.incrementAndGet(); + } else { + endpointCount.put(key, new AtomicInteger(1)); + } } } @@ -63,12 +68,24 @@ public class DefaultInflightRepository e } String key = exchange.getFromEndpoint().getEndpointKey(); - AtomicInteger existing = endpointCount.get(key); - if (existing != null) { - existing.addAndGet(-1); + // need to be synchronized as we can concurrently add/remove + synchronized (endpointCount) { + AtomicInteger existing = endpointCount.get(key); + if (existing != null) { + if (existing.decrementAndGet() <= 0) { + endpointCount.remove(key); + } + } } } + /** + * Internal only - Used for testing purpose. + */ + int endpointSize() { + return endpointCount.size(); + } + public int size() { return totalCount.get(); } @@ -90,6 +107,8 @@ public class DefaultInflightRepository e } else { LOG.debug("Shutting down with no inflight exchanges."); } - endpointCount.clear(); + synchronized (endpointCount) { + endpointCount.clear(); + } } } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java (from r1298095, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java&r1=1298095&r2=1298155&rev=1298155&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java Wed Mar 7 22:09:02 2012 @@ -16,32 +16,126 @@ */ package org.apache.camel.impl; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.spi.InflightRepository; +import org.apache.camel.Processor; +import org.apache.camel.Producer; /** - * @version + * @version */ -public class DefaultInflightRepositoryTest extends ContextTestSupport { - - public void testDefaultInflightRepository() throws Exception { - InflightRepository repo = new DefaultInflightRepository(); - - assertEquals(0, repo.size()); +public class DefaultInflightRepositoryConcurrentTest extends ContextTestSupport { - Exchange e1 = new DefaultExchange(context); - repo.add(e1); - assertEquals(1, repo.size()); + public static final int THREAD_COUNT = 20; + public static final int TOTAL_ENDPOINTS = 10000; + public static final int LOOP_COUNT = 100000; + + private static boolean failure; + private static CamelContext context = new DefaultCamelContext(); + + public void testThreaded() throws Exception { + long started = System.currentTimeMillis(); + + DefaultInflightRepository toTest = new DefaultInflightRepository(); + Endpoint[] eps = new Endpoint[TOTAL_ENDPOINTS]; + + for (int i = 0; i < eps.length; i++) { + // create TOTAL_ENDPOINTS endpoints + Endpoint endpoint = new DefaultEndpoint() { + final String uri = "def:" + System.nanoTime(); + @Override + public String getEndpointUri() { + return uri; + } + + public boolean isSingleton() { + return false; + } + + public Producer createProducer() throws Exception { + return null; + } + + public Consumer createConsumer(Processor processor) throws Exception { + return null; + } + }; + eps[i] = endpoint; + } + + AtomicInteger locker = new AtomicInteger(0); + + Thread[] ts = new Thread[THREAD_COUNT]; + for (int i = 0; i < ts.length; i++) { + TypicalConsumer consumer = new TypicalConsumer(); + consumer.eps = eps; + consumer.repo = toTest; + consumer.locker = locker; + ts[i] = new Thread(consumer); + } + + for (int i = 0; i < ts.length; i++) { + ts[i].start(); + } + Thread.sleep(1000); + while (locker.get() > 0) { + synchronized (locker) { + locker.wait(); + } + } + + if (failure) { + throw new Exception("Failed to properly track endpoints"); + } + + for (Endpoint ep : eps) { + Assert.assertTrue("Size MUST be 0", 0 == toTest.size(ep)); + } + + if (toTest.size() > 0) { + throw new Exception("Test either incomplete or tracking failed"); + } - Exchange e2 = new DefaultExchange(context); - repo.add(e2); - assertEquals(2, repo.size()); - - repo.remove(e2); - assertEquals(1, repo.size()); + Assert.assertTrue("Must not have any references left", 0 == toTest.endpointSize()); + } - repo.remove(e1); - assertEquals(0, repo.size()); + private static class TypicalConsumer implements Runnable { + Endpoint[] eps; + DefaultInflightRepository repo; + Random rand = new Random(System.nanoTime()); + AtomicInteger locker; + + public void run() { + synchronized (locker) { + locker.incrementAndGet(); + } + try { + for (int i = 0; i < LOOP_COUNT; i++) { + Endpoint ep = eps[Math.abs(rand.nextInt() % eps.length)]; + ep.setCamelContext(context); + Exchange ex = new DefaultExchange(ep); + repo.add(ex); + int size = repo.size(ep); + if (size <= 0) { + failure = true; + } + repo.remove(ex); + } + } catch (Exception e) { + failure = true; + } + + synchronized (locker) { + locker.decrementAndGet(); + locker.notifyAll(); + } + } } }