Author: dkulp Date: Fri Jul 8 19:32:45 2011 New Revision: 1144438 URL: http://svn.apache.org/viewvc?rev=1144438&view=rev Log: Merged revisions 1090564 via svnmerge from https://svn.apache.org/repos/asf/camel/trunk
........ r1090564 | davsclaus | 2011-04-09 07:09:28 -0400 (Sat, 09 Apr 2011) | 1 line CAMEL-3850: splitter aggregate on-the-fly task is now more responsive in parallel mode. ........ Added: camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java - copied unchanged from r1090564, camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java Modified: camel/branches/camel-2.7.x/ (props changed) camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Propchange: camel/branches/camel-2.7.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 8 19:32:45 2011 @@ -1 +1 @@ -/camel/trunk:1083696,1083723-1083724,1084150,1085277,1085543,1085549,1085905,1085909,1086165,1086231,1087005,1087276,1087612,1087620,1087856,1088583,1088916-1088917,1089275,1089348,1090166,1090960-1090969,1091082,1091518,1091771,1091799,1092068,1092577,1092667,1093978,1094147,1094156,1095405,1095469,1095471,1095475-1095476,1096346,1097909,1097912,1097978,1098630,1099417,1100975,1102162,1102181,1104076,1124497,1127744,1127988,1131411,1134252,1134501,1135223,1135364,1136290,1138285,1139163,1140096-1140102,1141783,1143925,1144248,1144324 +/camel/trunk:1083696,1083723-1083724,1084150,1085277,1085543,1085549,1085905,1085909,1086165,1086231,1087005,1087276,1087612,1087620,1087856,1088583,1088916-1088917,1089275,1089348,1090166,1090564,1090960-1090969,1091082,1091518,1091771,1091799,1092068,1092577,1092667,1093978,1094147,1094156,1095405,1095469,1095471,1095475-1095476,1096346,1097909,1097912,1097978,1098630,1099417,1100975,1102162,1102181,1104076,1124497,1127744,1127988,1131411,1134252,1134501,1135223,1135364,1136290,1138285,1139163,1140096-1140102,1141783,1143925,1144248,1144324 Propchange: camel/branches/camel-2.7.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1144438&r1=1144437&r2=1144438&view=diff ============================================================================== --- camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Jul 8 19:32:45 2011 @@ -258,11 +258,9 @@ public class MulticastProcessor extends // issue task to execute in separate thread so it can aggregate on-the-fly // while we submit new tasks, and those tasks complete concurrently // this allows us to optimize work and reduce memory consumption - AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, original, total, completion, running, + final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, aggregationOnTheFlyDone, allTasksSubmitted, executionException); - - // and start the aggregation task so we can aggregate on-the-fly - aggregateExecutorService.submit(task); + final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); LOG.trace("Starting to submit parallel tasks"); @@ -273,6 +271,13 @@ public class MulticastProcessor extends completion.submit(new Callable<Exchange>() { public Exchange call() throws Exception { + // only start the aggregation task when the task is being executed to avoid staring + // the aggregation task to early and pile up too many threads + if (aggregationTaskSubmitted.compareAndSet(false, true)) { + // but only submit the task once + aggregateExecutorService.submit(aggregateOnTheFlyTask); + } + if (!running.get()) { // do not start processing the task if we are not running return subExchange; @@ -452,7 +457,7 @@ public class MulticastProcessor extends ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout); } else { // log a WARN we timed out since it will not be aggregated and the Exchange will be lost - LOG.warn("Parallel processing timed out after " + timeout + " millis for number " + aggregated + ". This task will be cancelled and will not be aggregated."); + LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated); } if (LOG.isDebugEnabled()) { LOG.debug("Timeout occurred after " + timeout + " millis for number " + aggregated + " task."); @@ -884,9 +889,7 @@ public class MulticastProcessor extends if (isParallelProcessing() && aggregateExecutorService == null) { // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run - // and signal completion during processing, which would lead to a dead-lock - // keep at least one thread in the pool so we re-use the thread avoiding to create new threads because - // the pool shrank to zero. + // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing String name = getClass().getSimpleName() + "-AggregateTask"; aggregateExecutorService = createAggregateExecutorService(name); } @@ -901,7 +904,8 @@ public class MulticastProcessor extends * @return the thread pool */ protected synchronized ExecutorService createAggregateExecutorService(String name) { - return camelContext.getExecutorServiceStrategy().newThreadPool(this, name, 1, Integer.MAX_VALUE); + // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in + return camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, name); } protected void doStop() throws Exception {