Author: dkulp
Date: Fri Jul  8 19:32:45 2011
New Revision: 1144438

URL: http://svn.apache.org/viewvc?rev=1144438&view=rev
Log:
Merged revisions 1090564 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1090564 | davsclaus | 2011-04-09 07:09:28 -0400 (Sat, 09 Apr 2011) | 1 line
  
  CAMEL-3850: splitter aggregate on-the-fly task is now more responsive in 
parallel mode.
........

Added:
    
camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
      - copied unchanged from r1090564, 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
Modified:
    camel/branches/camel-2.7.x/   (props changed)
    
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul  8 19:32:45 2011
@@ -1 +1 @@
-/camel/trunk:1083696,1083723-1083724,1084150,1085277,1085543,1085549,1085905,1085909,1086165,1086231,1087005,1087276,1087612,1087620,1087856,1088583,1088916-1088917,1089275,1089348,1090166,1090960-1090969,1091082,1091518,1091771,1091799,1092068,1092577,1092667,1093978,1094147,1094156,1095405,1095469,1095471,1095475-1095476,1096346,1097909,1097912,1097978,1098630,1099417,1100975,1102162,1102181,1104076,1124497,1127744,1127988,1131411,1134252,1134501,1135223,1135364,1136290,1138285,1139163,1140096-1140102,1141783,1143925,1144248,1144324
+/camel/trunk:1083696,1083723-1083724,1084150,1085277,1085543,1085549,1085905,1085909,1086165,1086231,1087005,1087276,1087612,1087620,1087856,1088583,1088916-1088917,1089275,1089348,1090166,1090564,1090960-1090969,1091082,1091518,1091771,1091799,1092068,1092577,1092667,1093978,1094147,1094156,1095405,1095469,1095471,1095475-1095476,1096346,1097909,1097912,1097978,1098630,1099417,1100975,1102162,1102181,1104076,1124497,1127744,1127988,1131411,1134252,1134501,1135223,1135364,1136290,1138285,1139163,1140096-1140102,1141783,1143925,1144248,1144324

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1144438&r1=1144437&r2=1144438&view=diff
==============================================================================
--- 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Fri Jul  8 19:32:45 2011
@@ -258,11 +258,9 @@ public class MulticastProcessor extends 
             // issue task to execute in separate thread so it can aggregate 
on-the-fly
             // while we submit new tasks, and those tasks complete concurrently
             // this allows us to optimize work and reduce memory consumption
-            AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, 
original, total, completion, running,
+            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new 
AggregateOnTheFlyTask(result, original, total, completion, running,
                     aggregationOnTheFlyDone, allTasksSubmitted, 
executionException);
-
-            // and start the aggregation task so we can aggregate on-the-fly
-            aggregateExecutorService.submit(task);
+            final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
 
             LOG.trace("Starting to submit parallel tasks");
 
@@ -273,6 +271,13 @@ public class MulticastProcessor extends 
 
                 completion.submit(new Callable<Exchange>() {
                     public Exchange call() throws Exception {
+                        // only start the aggregation task when the task is 
being executed to avoid staring
+                        // the aggregation task to early and pile up too many 
threads
+                        if (aggregationTaskSubmitted.compareAndSet(false, 
true)) {
+                            // but only submit the task once
+                            
aggregateExecutorService.submit(aggregateOnTheFlyTask);
+                        }
+
                         if (!running.get()) {
                             // do not start processing the task if we are not 
running
                             return subExchange;
@@ -452,7 +457,7 @@ public class MulticastProcessor extends 
                         ((TimeoutAwareAggregationStrategy) 
strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
                     } else {
                         // log a WARN we timed out since it will not be 
aggregated and the Exchange will be lost
-                        LOG.warn("Parallel processing timed out after " + 
timeout + " millis for number " + aggregated + ". This task will be cancelled 
and will not be aggregated.");
+                        LOG.warn("Parallel processing timed out after {} 
millis for number {}. This task will be cancelled and will not be aggregated.", 
timeout, aggregated);
                     }
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Timeout occurred after " + timeout + " 
millis for number " + aggregated + " task.");
@@ -884,9 +889,7 @@ public class MulticastProcessor extends 
         if (isParallelProcessing() && aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the 
aggregate task may not be able to run
-            // and signal completion during processing, which would lead to a 
dead-lock
-            // keep at least one thread in the pool so we re-use the thread 
avoiding to create new threads because
-            // the pool shrank to zero.
+            // and signal completion during processing, which would lead to 
what would appear as a dead-lock or a slow processing
             String name = getClass().getSimpleName() + "-AggregateTask";
             aggregateExecutorService = createAggregateExecutorService(name);
         }
@@ -901,7 +904,8 @@ public class MulticastProcessor extends 
      * @return the thread pool
      */
     protected synchronized ExecutorService 
createAggregateExecutorService(String name) {
-        return camelContext.getExecutorServiceStrategy().newThreadPool(this, 
name, 1, Integer.MAX_VALUE);
+        // use a cached thread pool so we each on-the-fly task has a dedicated 
thread to process completions as they come in
+        return 
camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, name);
     }
 
     protected void doStop() throws Exception {


Reply via email to