Author: davsclaus Date: Wed Mar 7 22:15:50 2012 New Revision: 1298162 URL: http://svn.apache.org/viewvc?rev=1298162&view=rev Log: CAMEL-5058: Fix potential leak in inflight registry if sending to many unique endpoints, not from a routes. Thanks to Zach Calvert for the patch.
Added: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java - copied unchanged from r1298155, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Mar 7 22:15:50 2012 @@ -1 +1 @@ -/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125 +/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298162&r1=1298161&r2=1298162&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Wed Mar 7 22:15:50 2012 @@ -16,7 +16,8 @@ */ package org.apache.camel.impl; -import java.util.concurrent.ConcurrentHashMap; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; @@ -36,7 +37,7 @@ public class DefaultInflightRepository e private static final transient Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); private final AtomicInteger totalCount = new AtomicInteger(); // use endpoint key as key so endpoints with lenient properties is registered using the same key (eg dynamic http endpoints) - private final ConcurrentHashMap<String, AtomicInteger> endpointCount = new ConcurrentHashMap<String, AtomicInteger>(); + private final Map<String, AtomicInteger> endpointCount = new HashMap<String, AtomicInteger>(); public void add(Exchange exchange) { int count = totalCount.incrementAndGet(); @@ -47,9 +48,14 @@ public class DefaultInflightRepository e } String key = exchange.getFromEndpoint().getEndpointKey(); - AtomicInteger existing = endpointCount.putIfAbsent(key, new AtomicInteger(1)); - if (existing != null) { - existing.addAndGet(1); + // need to be synchronized as we can concurrently add/remove + synchronized (endpointCount) { + AtomicInteger existing = endpointCount.get(key); + if (existing != null) { + existing.incrementAndGet(); + } else { + endpointCount.put(key, new AtomicInteger(1)); + } } } @@ -62,12 +68,24 @@ public class DefaultInflightRepository e } String key = exchange.getFromEndpoint().getEndpointKey(); - AtomicInteger existing = endpointCount.get(key); - if (existing != null) { - existing.addAndGet(-1); + // need to be synchronized as we can concurrently add/remove + synchronized (endpointCount) { + AtomicInteger existing = endpointCount.get(key); + if (existing != null) { + if (existing.decrementAndGet() <= 0) { + endpointCount.remove(key); + } + } } } + /** + * Internal only - Used for testing purpose. + */ + int endpointSize() { + return endpointCount.size(); + } + public int size() { return totalCount.get(); } @@ -89,6 +107,8 @@ public class DefaultInflightRepository e } else { LOG.info("Shutting down with no inflight exchanges."); } - endpointCount.clear(); + synchronized (endpointCount) { + endpointCount.clear(); + } } }