Repository: camel Updated Branches: refs/heads/camel-2.15.x fa726cd0e -> 943b555d6 refs/heads/master 8e18c032a -> 37d1f0894
CAMEL-8713: ParallelAggregate option when using parallel mode does not run in parallel Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37d1f089 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37d1f089 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37d1f089 Branch: refs/heads/master Commit: 37d1f08945cf73397ab73c9f71d786c65246659e Parents: 8e18c03 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 28 17:17:28 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Apr 28 20:01:36 2015 +0200 ---------------------------------------------------------------------- .../camel/processor/MulticastProcessor.java | 124 ++++++++++++++----- 1 file changed, 93 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/37d1f089/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 1cd86da..334ceb1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -362,7 +362,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } /** - * Task to aggregate on-the-fly for completed tasks when using parallel processing. + * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing. * <p/> * This ensures lower memory consumption as we do not need to keep all completed tasks in memory * before we perform aggregation. Instead this separate thread will run and aggregate when new @@ -417,21 +417,21 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } private void aggregateOnTheFly() throws InterruptedException, ExecutionException { - boolean timedOut = false; + final AtomicBoolean timedOut = new AtomicBoolean(); boolean stoppedOnException = false; final StopWatch watch = new StopWatch(); - int aggregated = 0; + final AtomicInteger aggregated = new AtomicInteger(); boolean done = false; // not a for loop as on the fly may still run while (!done) { // check if we have already aggregate everything - if (allTasksSubmitted.get() && aggregated >= total.get()) { + if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) { LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); break; } Future<Exchange> future; - if (timedOut) { + if (timedOut.get()) { // we are timed out but try to grab if some tasks has been completed // poll will return null if no tasks is present future = completion.poll(); @@ -454,27 +454,12 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } if (future == null) { - // timeout occurred - AggregationStrategy strategy = getAggregationStrategy(null); - if (strategy instanceof TimeoutAwareAggregationStrategy) { - // notify the strategy we timed out - Exchange oldExchange = result.get(); - if (oldExchange == null) { - // if they all timed out the result may not have been set yet, so use the original exchange - oldExchange = original; - } - ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout); + ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut); + if (parallelAggregate) { + aggregateExecutorService.submit(task); } else { - // log a WARN we timed out since it will not be aggregated and the Exchange will be lost - LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated); - } - LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated); - timedOut = true; - - // mark that index as timed out, which allows us to try to retrieve - // any already completed tasks in the next loop - if (completion instanceof SubmitOrderedCompletionService) { - ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); + // in non parallel mode then just run the task + task.run(); } } else { // there is a result to aggregate @@ -493,18 +478,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } // we got a result so aggregate it + ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated); if (parallelAggregate) { - doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); + aggregateExecutorService.submit(task); } else { - doAggregate(getAggregationStrategy(subExchange), result, subExchange); + // in non parallel mode then just run the task + task.run(); } } - - aggregated++; } - if (timedOut || stoppedOnException) { - if (timedOut) { + if (timedOut.get() || stoppedOnException) { + if (timedOut.get()) { LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); } if (stoppedOnException) { @@ -516,6 +501,83 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } } + /** + * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. + */ + private final class ParallelAggregateTask implements Runnable { + + private final AtomicExchange result; + private final Exchange subExchange; + private final AtomicInteger aggregated; + + private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) { + this.result = result; + this.subExchange = subExchange; + this.aggregated = aggregated; + } + + @Override + public void run() { + if (parallelAggregate) { + doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); + } else { + doAggregate(getAggregationStrategy(subExchange), result, subExchange); + } + aggregated.incrementAndGet(); + } + } + + /** + * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. + */ + private final class ParallelAggregateTimeoutTask implements Runnable { + + private final Exchange original; + private final AtomicExchange result; + private final CompletionService<Exchange> completion; + private final AtomicInteger aggregated; + private final AtomicInteger total; + private final AtomicBoolean timedOut; + + private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion, + AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) { + this.original = original; + this.result = result; + this.completion = completion; + this.aggregated = aggregated; + this.total = total; + this.timedOut = timedOut; + } + + @Override + public void run() { + AggregationStrategy strategy = getAggregationStrategy(null); + if (strategy instanceof TimeoutAwareAggregationStrategy) { + // notify the strategy we timed out + Exchange oldExchange = result.get(); + if (oldExchange == null) { + // if they all timed out the result may not have been set yet, so use the original exchange + oldExchange = original; + } + ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout); + } else { + // log a WARN we timed out since it will not be aggregated and the Exchange will be lost + LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue()); + } + LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue()); + timedOut.set(true); + + // mark that index as timed out, which allows us to try to retrieve + // any already completed tasks in the next loop + if (completion instanceof SubmitOrderedCompletionService) { + ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); + } + + // we timed out so increment the counter + aggregated.incrementAndGet(); + } + } + protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception { AtomicInteger total = new AtomicInteger(); Iterator<ProcessorExchangePair> it = pairs.iterator();