Author: davsclaus
Date: Wed Mar  7 22:15:50 2012
New Revision: 1298162

URL: http://svn.apache.org/viewvc?rev=1298162&view=rev
Log:
CAMEL-5058: Fix potential leak in inflight registry if sending to many unique 
endpoints, not from a routes. Thanks to Zach Calvert for the patch.

Added:
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
      - copied unchanged from r1298155, 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar  7 22:15:50 2012
@@ -1 +1 @@
-/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125
+/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=1298162&r1=1298161&r2=1298162&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
 Wed Mar  7 22:15:50 2012
@@ -16,7 +16,8 @@
  */
 package org.apache.camel.impl;
 
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
@@ -36,7 +37,7 @@ public class DefaultInflightRepository e
     private static final transient Logger LOG = 
LoggerFactory.getLogger(DefaultInflightRepository.class);
     private final AtomicInteger totalCount = new AtomicInteger();
     // use endpoint key as key so endpoints with lenient properties is 
registered using the same key (eg dynamic http endpoints)
-    private final ConcurrentHashMap<String, AtomicInteger> endpointCount = new 
ConcurrentHashMap<String, AtomicInteger>();
+    private final Map<String, AtomicInteger> endpointCount = new 
HashMap<String, AtomicInteger>();
 
     public void add(Exchange exchange) {
         int count = totalCount.incrementAndGet();
@@ -47,9 +48,14 @@ public class DefaultInflightRepository e
         }
 
         String key = exchange.getFromEndpoint().getEndpointKey();
-        AtomicInteger existing = endpointCount.putIfAbsent(key, new 
AtomicInteger(1));
-        if (existing != null) {
-            existing.addAndGet(1);
+        // need to be synchronized as we can concurrently add/remove
+        synchronized (endpointCount) {
+            AtomicInteger existing = endpointCount.get(key);
+            if (existing != null) {
+                existing.incrementAndGet();
+            } else {
+                endpointCount.put(key, new AtomicInteger(1));
+            }
         }
     }
 
@@ -62,12 +68,24 @@ public class DefaultInflightRepository e
         }
 
         String key = exchange.getFromEndpoint().getEndpointKey();
-        AtomicInteger existing = endpointCount.get(key);
-        if (existing != null) {
-            existing.addAndGet(-1);
+        // need to be synchronized as we can concurrently add/remove
+        synchronized (endpointCount) {
+            AtomicInteger existing = endpointCount.get(key);
+            if (existing != null) {
+                if (existing.decrementAndGet() <= 0) {
+                    endpointCount.remove(key);
+                }
+            }
         }
     }
 
+    /**
+     * Internal only - Used for testing purpose.
+     */
+    int endpointSize() {
+        return endpointCount.size();
+    }
+
     public int size() {
         return totalCount.get();
     }
@@ -89,6 +107,8 @@ public class DefaultInflightRepository e
         } else {
             LOG.info("Shutting down with no inflight exchanges.");
         }
-        endpointCount.clear();
+        synchronized (endpointCount) {
+            endpointCount.clear();
+        }
     }
 }


Reply via email to