Repository: camel Updated Branches: refs/heads/master c9a408b20 -> f1dbb4852
CAMEL-7521: Added parallelAggregate option to mutlicast/splitter/recipient list eips. Thanks to Jerry Williamson for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f1dbb485 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f1dbb485 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f1dbb485 Branch: refs/heads/master Commit: f1dbb4852da4d5848e35e9ab21626cdd8cdf7216 Parents: c9a408b Author: Claus Ibsen <[email protected]> Authored: Fri Jul 4 13:30:42 2014 +0200 Committer: Claus Ibsen <[email protected]> Committed: Fri Jul 4 13:31:56 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/RecipientList.java | 1 + .../apache/camel/component/bean/MethodInfo.java | 1 + .../apache/camel/model/MulticastDefinition.java | 37 ++++- .../camel/model/RecipientListDefinition.java | 38 +++++- .../org/apache/camel/model/SplitDefinition.java | 35 ++++- .../camel/processor/MulticastProcessor.java | 45 ++++++- .../apache/camel/processor/RecipientList.java | 11 +- .../camel/processor/RecipientListProcessor.java | 12 +- .../org/apache/camel/processor/Splitter.java | 11 +- .../SplitterParallelAggregateTest.java | 135 +++++++++++++++++++ .../apache/camel/processor/SplitterTest.java | 21 +++ 11 files changed, 335 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/RecipientList.java b/camel-core/src/main/java/org/apache/camel/RecipientList.java index e72f025..7cd9cda 100644 --- a/camel-core/src/main/java/org/apache/camel/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/RecipientList.java @@ -46,6 +46,7 @@ public @interface RecipientList { String context() default ""; String delimiter() default ","; boolean parallelProcessing() default false; + boolean parallelAggregate() default false; boolean stopOnException() default false; boolean streaming() default false; boolean ignoreInvalidEndpoints() default false; http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java index 0c22e75..25d6b02 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java @@ -159,6 +159,7 @@ public class MethodInfo { recipientList.setStopOnException(recipientListAnnotation.stopOnException()); recipientList.setIgnoreInvalidEndpoints(recipientListAnnotation.ignoreInvalidEndpoints()); recipientList.setParallelProcessing(recipientListAnnotation.parallelProcessing()); + recipientList.setParallelAggregate(recipientListAnnotation.parallelAggregate()); recipientList.setStreaming(recipientListAnnotation.streaming()); recipientList.setTimeout(recipientListAnnotation.timeout()); recipientList.setShareUnitOfWork(recipientListAnnotation.shareUnitOfWork()); http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index 1c6845f..9f5b606 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -69,6 +69,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i private Processor onPrepare; @XmlAttribute private Boolean shareUnitOfWork; + @XmlAttribute + private Boolean parallelAggregate; public MulticastDefinition() { } @@ -155,7 +157,20 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i setParallelProcessing(true); return this; } - + + /** + * Doing the aggregate work in parallel + * <p/> + * Notice that if enabled, then the {@link org.apache.camel.processor.aggregate.AggregationStrategy} in use + * must be implemented as thread safe, as concurrent threads can call the <tt>aggregate</tt> methods at the same time. + * + * @return the builder + */ + public MulticastDefinition parallelAggregate() { + setParallelAggregate(true); + return this; + } + /** * Aggregates the responses as the are done (e.g. out of order sequence) * @@ -261,7 +276,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing(), - threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork()); + threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork(), isParallelAggregate()); if (isShareUnitOfWork()) { // wrap answer in a sub unit of work, since we share the unit of work CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer); @@ -418,4 +433,22 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i return shareUnitOfWork != null && shareUnitOfWork; } + public Boolean getParallelAggregate() { + return parallelAggregate; + } + + /** + * Whether to aggregate using a sequential single thread, or allow parallel aggregation. + * <p/> + * Notice that if enabled, then the {@link org.apache.camel.processor.aggregate.AggregationStrategy} in use + * must be implemented as thread safe, as concurrent threads can call the <tt>aggregate</tt> methods at the same time. + */ + public boolean isParallelAggregate() { + return parallelAggregate != null && parallelAggregate; + } + + public void setParallelAggregate(Boolean parallelAggregate) { + this.parallelAggregate = parallelAggregate; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index c5b5796..b3b6ab5 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -78,6 +78,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext private Boolean shareUnitOfWork; @XmlAttribute private Integer cacheSize; + @XmlAttribute + private Boolean parallelAggregate; public RecipientListDefinition() { } @@ -117,7 +119,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext } answer.setAggregationStrategy(createAggregationStrategy(routeContext)); answer.setParallelProcessing(isParallelProcessing()); - answer.setStreaming(isStreaming()); + answer.setParallelAggregate(isParallelAggregate()); + answer.setStreaming(isStreaming()); answer.setShareUnitOfWork(isShareUnitOfWork()); if (getCacheSize() != null) { answer.setCacheSize(getCacheSize()); @@ -284,7 +287,20 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext setParallelProcessing(true); return this; } - + + /** + * Doing the aggregate work in parallel + * <p/> + * Notice that if enabled, then the {@link org.apache.camel.processor.aggregate.AggregationStrategy} in use + * must be implemented as thread safe, as concurrent threads can call the <tt>aggregate</tt> methods at the same time. + * + * @return the builder + */ + public RecipientListDefinition<Type> parallelAggregate() { + setParallelAggregate(true); + return this; + } + /** * Doing the recipient list work in streaming model * @@ -533,4 +549,22 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext public void setCacheSize(Integer cacheSize) { this.cacheSize = cacheSize; } + + public Boolean getParallelAggregate() { + return parallelAggregate; + } + + /** + * Whether to aggregate using a sequential single thread, or allow parallel aggregation. + * <p/> + * Notice that if enabled, then the {@link org.apache.camel.processor.aggregate.AggregationStrategy} in use + * must be implemented as thread safe, as concurrent threads can call the <tt>aggregate</tt> methods at the same time. + */ + public boolean isParallelAggregate() { + return parallelAggregate != null && parallelAggregate; + } + + public void setParallelAggregate(Boolean parallelAggregate) { + this.parallelAggregate = parallelAggregate; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java index 0b54bdd..aa1a102 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -69,6 +69,8 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw private Processor onPrepare; @XmlAttribute private Boolean shareUnitOfWork; + @XmlAttribute + private Boolean parallelAggregate; public SplitDefinition() { } @@ -116,7 +118,7 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy, isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), - timeout, onPrepare, isShareUnitOfWork()); + timeout, onPrepare, isShareUnitOfWork(), isParallelAggregate()); if (isShareUnitOfWork()) { // wrap answer in a sub unit of work, since we share the unit of work CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer); @@ -207,6 +209,19 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw } /** + * Doing the aggregate work in parallel + * <p/> + * Notice that if enabled, then the {@link org.apache.camel.processor.aggregate.AggregationStrategy} in use + * must be implemented as thread safe, as concurrent threads can call the <tt>aggregate</tt> methods at the same time. + * + * @return the builder + */ + public SplitDefinition parallelAggregate() { + setParallelAggregate(true); + return this; + } + + /** * Enables streaming. * See {@link org.apache.camel.model.SplitDefinition#isStreaming()} for more information * @@ -335,6 +350,24 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw return streaming != null && streaming; } + public Boolean getParallelAggregate() { + return parallelAggregate; + } + + /** + * Whether to aggregate using a sequential single thread, or allow parallel aggregation. + * <p/> + * Notice that if enabled, then the {@link org.apache.camel.processor.aggregate.AggregationStrategy} in use + * must be implemented as thread safe, as concurrent threads can call the <tt>aggregate</tt> methods at the same time. + */ + public boolean isParallelAggregate() { + return parallelAggregate != null && parallelAggregate; + } + + public void setParallelAggregate(Boolean parallelAggregate) { + this.parallelAggregate = parallelAggregate; + } + public Boolean getStopOnException() { return stopOnException; } http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 697ae32..30c8a55 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -147,6 +147,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor private final AggregationStrategy aggregationStrategy; private final boolean parallelProcessing; private final boolean streaming; + private final boolean parallelAggregate; private final boolean stopOnException; private final ExecutorService executorService; private final boolean shutdownExecutorService; @@ -160,12 +161,21 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { - this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false); + this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false); } + @Deprecated public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { + this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, + streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); + } + + public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, + boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, + boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, + boolean parallelAggregate) { notNull(camelContext, "camelContext"); this.camelContext = camelContext; this.processors = processors; @@ -179,6 +189,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor this.timeout = timeout; this.onPrepare = onPrepare; this.shareUnitOfWork = shareUnitOfWork; + this.parallelAggregate = parallelAggregate; } @Override @@ -535,7 +546,12 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); - doAggregate(getAggregationStrategy(subExchange), result, subExchange); + if (parallelAggregate) { + doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); + } else { + doAggregate(getAggregationStrategy(subExchange), result, subExchange); + } + total.incrementAndGet(); } @@ -610,7 +626,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } try { - doAggregate(getAggregationStrategy(subExchange), result, subExchange); + if (parallelAggregate) { + doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); + } else { + doAggregate(getAggregationStrategy(subExchange), result, subExchange); + } } catch (Throwable e) { // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); @@ -655,7 +675,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // must catch any exceptions from aggregation try { - doAggregate(getAggregationStrategy(subExchange), result, subExchange); + if (parallelAggregate) { + doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); + } else { + doAggregate(getAggregationStrategy(subExchange), result, subExchange); + } } catch (Throwable e) { // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); @@ -800,6 +824,19 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor * @param exchange the exchange to be added to the result */ protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { + doAggregateInternal(strategy, result, exchange); + } + + /** + * Aggregate the {@link Exchange) with the current result. + * This method is unsynchronized and is called directly when parallelAggregate is enabled. + * In all other cases, this method is called from the doAggregate which is a synchronized method + * + * @param strategy + * @param result + * @param exchange + */ + protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { if (strategy != null) { // prepare the exchanges for aggregation Exchange oldExchange = result.get(); http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index c6c162e..ee80672 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -57,6 +57,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { private Expression expression; private final String delimiter; private boolean parallelProcessing; + private boolean parallelAggregate; private boolean stopOnException; private boolean ignoreInvalidEndpoints; private boolean streaming; @@ -133,7 +134,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), - isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) { + isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) { @Override protected synchronized ExecutorService createAggregateExecutorService(String name) { // use a shared executor service to avoid creating new thread pools @@ -226,6 +227,14 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { this.parallelProcessing = parallelProcessing; } + public boolean isParallelAggregate() { + return parallelAggregate; + } + + public void setParallelAggregate(boolean parallelAggregate) { + this.parallelAggregate = parallelAggregate; + } + public boolean isStopOnException() { return stopOnException; } http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index f538e36..70a3853 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -131,11 +131,21 @@ public class RecipientListProcessor extends MulticastProcessor { this.iter = iter; } + @Deprecated public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, - streaming, stopOnException, timeout, onPrepare, shareUnitOfWork); + streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); + this.producerCache = producerCache; + this.iter = iter; + } + + public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, + boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, + boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { + super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, + streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate); this.producerCache = producerCache; this.iter = iter; } http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/Splitter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index d1a0f64..a0b4a2a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -63,11 +63,20 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false); } + @Deprecated public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) { + this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, + streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false); + } + + public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, + boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, + boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork, + boolean parallelAggregate) { super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, - shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork); + shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate); this.expression = expression; notNull(expression, "expression"); notNull(destination, "destination"); http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java new file mode 100644 index 0000000..91ac0b2 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Future; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.util.StopWatch; + +public class SplitterParallelAggregateTest extends ContextTestSupport { + + // run this test manually as it takes some time to process, but shows that parallel aggregate can + // be faster when enabled. + private boolean enabled; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:splitSynchronizedAggregation") + .split(method(new MySplitter(), "rowIterator"), new MyAggregationStrategy()) + .to("log:someSplitProcessing?groupSize=500"); + + from("direct:splitUnsynchronizedAggregation") + .split(method(new MySplitter(), "rowIterator"), new MyAggregationStrategy()).parallelAggregate() + .to("log:someSplitProcessing?groupSize=500"); + } + }; + } + + public void test1() throws Exception { + if (!enabled) { + return; + } + int numberOfRequests = 1; + timeSplitRoutes(numberOfRequests); + } + + public void test2() throws Exception { + if (!enabled) { + return; + } + int numberOfRequests = 2; + timeSplitRoutes(numberOfRequests); + } + + public void test4() throws Exception { + if (!enabled) { + return; + } + int numberOfRequests = 4; + timeSplitRoutes(numberOfRequests); + } + + protected void timeSplitRoutes(int numberOfRequests) throws Exception { + String[] endpoints = new String[]{"direct:splitSynchronizedAggregation", "direct:splitUnsynchronizedAggregation"}; + List<Future<File>> futures = new ArrayList<Future<File>>(); + StopWatch stopWatch = new StopWatch(false); + + for (int endpointIndex = 0; endpointIndex < endpoints.length; endpointIndex++) { + stopWatch.restart(); + for (int requestIndex = 0; requestIndex < numberOfRequests; requestIndex++) { + futures.add(template.asyncRequestBody( + endpoints[endpointIndex], null, File.class)); + } + + for (int i = 0; i < futures.size(); i++) { + Future<File> future = futures.get(i); + future.get(); + } + stopWatch.stop(); + + log.info(String.format("test%d.%s=%d\n", numberOfRequests, endpoints[endpointIndex], stopWatch.taken())); + } + } + + public static class MySplitter { + public Iterator<String[]> rowIterator() { + // we would normally be reading a large file but for this test, + // we'll just manufacture a bunch of string + // arrays + LinkedList<String[]> rows = new LinkedList<String[]>(); + String[] row; + for (int i = 0; i < 10000; i++) { + row = new String[10]; + for (int j = 0; j < row.length; j++) { + row[j] = String.valueOf(System.nanoTime()); + } + rows.add(row); + } + + return rows.iterator(); + } + } + + public static class MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + + // emulate some processing + Random random = new Random(System.currentTimeMillis()); + for (int i = 0; i < 10000; i++) { + random.nextGaussian(); + } + + return newExchange; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java index 420af1d..9e0fd6f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java @@ -164,6 +164,26 @@ public class SplitterTest extends ContextTestSupport { assertEquals((Integer) 5, result.getProperty("aggregated", Integer.class)); } + public void testSplitterParallelAggregate() throws Exception { + MockEndpoint resultEndpoint = getMockEndpoint("mock:result"); + resultEndpoint.expectedMessageCount(5); + resultEndpoint.expectedBodiesReceivedInAnyOrder("James", "Guillaume", "Hiram", "Rob", "Roman"); + + Exchange result = template.request("direct:parallelAggregate", new Processor() { + public void process(Exchange exchange) { + Message in = exchange.getIn(); + in.setBody("James,Guillaume,Hiram,Rob,Roman"); + in.setHeader("foo", "bar"); + } + }); + + assertMockEndpointsSatisfied(); + Message out = result.getOut(); + + assertMessageHeader(out, "foo", "bar"); + assertEquals((Integer) 5, result.getProperty("aggregated", Integer.class)); + } + public void testSplitterWithStreamingAndFileBody() throws Exception { URL url = this.getClass().getResource("/org/apache/camel/processor/simple.txt"); assertNotNull("We should find this simple file here.", url); @@ -250,6 +270,7 @@ public class SplitterTest extends ContextTestSupport { from("direct:seqential").split(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result"); from("direct:parallel").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing().to("mock:result"); + from("direct:parallelAggregate").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing().parallelAggregate().to("mock:result"); from("direct:streaming").split(body().tokenize(",")).streaming().to("mock:result"); from("direct:parallel-streaming").split(body().tokenize(","), new MyAggregationStrategy()).parallelProcessing().streaming().to("mock:result"); from("direct:exception")
