Repository: camel Updated Branches: refs/heads/master 9935da13f -> 443aa99bd
CAMEL-9061: Java DSL allow to configure parallel processing using a boolean. Allow to configure route options using a property placeholder as you can do in XML DSL. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/443aa99b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/443aa99b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/443aa99b Branch: refs/heads/master Commit: 443aa99bd50585140eb2de6ee15c2a9aab39c221 Parents: 9935da1 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 7 16:01:08 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 7 16:01:08 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/builder/BuilderSupport.java | 2 +- .../apache/camel/model/AggregateDefinition.java | 11 ++++++ .../apache/camel/model/MulticastDefinition.java | 12 +++++++ .../camel/model/OnCompletionDefinition.java | 11 ++++++ .../camel/model/RecipientListDefinition.java | 12 +++++++ .../org/apache/camel/model/RouteDefinition.java | 35 +++++++++++++++++++- .../org/apache/camel/model/SplitDefinition.java | 12 +++++++ 7 files changed, 93 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java b/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java index cdaf71f..45aed36 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java +++ b/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java @@ -47,7 +47,7 @@ public abstract class BuilderSupport { } protected BuilderSupport(CamelContext context) { - this.context = (ModelCamelContext)context; + this.context = context.adapt(ModelCamelContext.class); } // Builder methods http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index b369f1e..4b89a55 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -44,6 +44,7 @@ import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.concurrent.SynchronousExecutorService; /** @@ -876,6 +877,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * When aggregated are completed they are being send out of the aggregator. + * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. + * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. + */ + public AggregateDefinition parallelProcessing(boolean parallelProcessing) { + setParallelProcessing(parallelProcessing); + return this; + } + + /** * Turns on using optimistic locking, which requires the aggregationRepository being used, * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}. */ http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index c9e958a..2e7e76c 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -156,6 +156,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } /** + * If enabled then sending messages to the multicasts occurs concurrently. + * Note the caller thread will still wait until all messages has been fully processed, before it continues. + * Its only the sending and processing the replies from the multicasts which happens concurrently. + * + * @return the builder + */ + public MulticastDefinition parallelProcessing(boolean parallelProcessing) { + setParallelProcessing(parallelProcessing); + return this; + } + + /** * If enabled then the aggregate method on AggregationStrategy can be called concurrently. * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. * By default this is false meaning that Camel synchronizes the call to the aggregate method. http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java index c44d0d0..e9d4ab6 100644 --- a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java @@ -291,6 +291,17 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi return this; } + /** + * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. + * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route. + * + * @return the builder + */ + public OnCompletionDefinition parallelProcessing(boolean parallelProcessing) { + setParallelProcessing(parallelProcessing); + return this; + } + public List<ProcessorDefinition<?>> getOutputs() { return outputs; } http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index c983cf3..49d75f9 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -290,6 +290,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext } /** + * If enabled then sending messages to the recipients occurs concurrently. + * Note the caller thread will still wait until all messages has been fully processed, before it continues. + * Its only the sending and processing the replies from the recipients which happens concurrently. + * + * @return the builder + */ + public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) { + setParallelProcessing(parallelProcessing); + return this; + } + + /** * If enabled then the aggregate method on AggregationStrategy can be called concurrently. * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. * By default this is false meaning that Camel synchronizes the call to the aggregate method. http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java index 54e7e55..1780992 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java @@ -397,6 +397,17 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> { } /** + * Enable stream caching for this route. + * + * @param streamCache whether to use stream caching (true or false), the value can be a property placeholder + * @return the builder + */ + public RouteDefinition streamCaching(String streamCache) { + setStreamCache(streamCache); + return this; + } + + /** * Disable tracing for this route. * * @return the builder @@ -417,6 +428,17 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> { } /** + * Enable tracing for this route. + * + * @param tracing whether to use tracing (true or false), the value can be a property placeholder + * @return the builder + */ + public RouteDefinition tracing(String tracing) { + setTrace(tracing); + return this; + } + + /** * Enable message history for this route. * * @return the builder @@ -427,6 +449,17 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> { } /** + * Enable message history for this route. + * + * @param messageHistory whether to use message history (true or false), the value can be a property placeholder + * @return the builder + */ + public RouteDefinition messageHistory(String messageHistory) { + setMessageHistory(messageHistory); + return this; + } + + /** * Disable message history for this route. * * @return the builder @@ -503,7 +536,7 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> { /** * Sets the auto startup property on this route. * - * @param autoStartup - String indicator ("true" or "false") + * @param autoStartup whether to auto startup (true or false), the value can be a property placeholder * @return the builder */ public RouteDefinition autoStartup(String autoStartup) { http://git-wip-us.apache.org/repos/asf/camel/blob/443aa99b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java index 01e4153..21654ac 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -209,6 +209,18 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw } /** + * If enabled then processing each splitted messages occurs concurrently. + * Note the caller thread will still wait until all messages has been fully processed, before it continues. + * Its only processing the sub messages from the splitter which happens concurrently. + * + * @return the builder + */ + public SplitDefinition parallelProcessing(boolean parallelProcessing) { + setParallelProcessing(parallelProcessing); + return this; + } + + /** * If enabled then the aggregate method on AggregationStrategy can be called concurrently. * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. * By default this is false meaning that Camel synchronizes the call to the aggregate method.