This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 52451f1d13a9a873f633f82721c9bfae08a80aa9 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Mon Feb 10 15:22:17 2020 +0100 Move a few methods from ProcessorDefinitionHelper to ProcessorDefinition --- .../component/resilience4j/ResilienceReifier.java | 3 +- .../camel/model/ProcessorDefinitionHelper.java | 259 ------------------- .../org/apache/camel/reifier/AggregateReifier.java | 21 +- .../org/apache/camel/reifier/DelayReifier.java | 5 +- .../apache/camel/reifier/LoadBalanceReifier.java | 4 +- .../org/apache/camel/reifier/MulticastReifier.java | 5 +- .../apache/camel/reifier/OnCompletionReifier.java | 5 +- .../apache/camel/reifier/OnExceptionReifier.java | 2 +- .../org/apache/camel/reifier/ProcessorReifier.java | 287 +++++++++++++++++++-- .../apache/camel/reifier/RecipientListReifier.java | 7 +- .../org/apache/camel/reifier/RouteReifier.java | 5 +- .../org/apache/camel/reifier/SplitReifier.java | 5 +- .../org/apache/camel/reifier/ThreadsReifier.java | 5 +- .../org/apache/camel/reifier/ThrottleReifier.java | 5 +- .../org/apache/camel/reifier/WireTapReifier.java | 7 +- .../camel/reifier/rest/RestBindingReifier.java | 10 +- 16 files changed, 308 insertions(+), 327 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index ffefa52..6eb6c27 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -31,7 +31,6 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.model.CircuitBreakerDefinition; import org.apache.camel.model.Model; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.Resilience4jConfigurationCommon; import org.apache.camel.model.Resilience4jConfigurationDefinition; import org.apache.camel.reifier.ProcessorReifier; @@ -152,7 +151,7 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition boolean shutdownThreadPool = false; ExecutorService executorService = routeContext.lookup(ref, ExecutorService.class); if (executorService == null) { - executorService = ProcessorDefinitionHelper.lookupExecutorServiceRef(routeContext, "CircuitBreaker", definition, ref); + executorService = lookupExecutorServiceRef("CircuitBreaker", definition, ref); shutdownThreadPool = true; } processor.setExecutorService(executorService); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java index da812b8..1996c1f 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java @@ -24,24 +24,17 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import java.util.function.Supplier; -import javax.xml.namespace.QName; - import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangeConstantProvider; import org.apache.camel.NamedNode; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.PropertyPlaceholderConfigurer; -import org.apache.camel.spi.RouteContext; import org.apache.camel.support.PropertyBindingSupport; -import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -369,258 +362,6 @@ public final class ProcessorDefinitionHelper { } /** - * Is there any outputs in the given list. - * <p/> - * Is used for check if the route output has any real outputs (non - * abstracts) - * - * @param outputs the outputs - * @param excludeAbstract whether or not to exclude abstract outputs (e.g. - * skip onException etc.) - * @return <tt>true</tt> if has outputs, otherwise <tt>false</tt> is - * returned - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public static boolean hasOutputs(List<ProcessorDefinition<?>> outputs, boolean excludeAbstract) { - if (outputs == null || outputs.isEmpty()) { - return false; - } - if (!excludeAbstract) { - return !outputs.isEmpty(); - } - for (ProcessorDefinition output : outputs) { - if (output.isWrappingEntireOutput()) { - // special for those as they wrap entire output, so we should - // just check its output - return hasOutputs(output.getOutputs(), excludeAbstract); - } - if (!output.isAbstract()) { - return true; - } - } - return false; - } - - /** - * Determines whether a new thread pool will be created or not. - * <p/> - * This is used to know if a new thread pool will be created, and therefore - * is not shared by others, and therefore exclusive to the definition. - * - * @param routeContext the route context - * @param definition the node definition which may leverage executor - * service. - * @param useDefault whether to fallback and use a default thread pool, if - * no explicit configured - * @return <tt>true</tt> if a new thread pool will be created, - * <tt>false</tt> if not - * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, - * String, ExecutorServiceAwareDefinition, boolean) - */ - public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) { - ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); - ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); - - if (definition.getExecutorService() != null) { - // no there is a custom thread pool configured - return false; - } else if (definition.getExecutorServiceRef() != null) { - ExecutorService answer = routeContext.lookup(definition.getExecutorServiceRef(), ExecutorService.class); - // if no existing thread pool, then we will have to create a new - // thread pool - return answer == null; - } else if (useDefault) { - return true; - } - - return false; - } - - /** - * Will lookup in {@link org.apache.camel.spi.Registry} for a - * {@link ExecutorService} registered with the given - * <tt>executorServiceRef</tt> name. - * <p/> - * This method will lookup for configured thread pool in the following order - * <ul> - * <li>from the {@link org.apache.camel.spi.Registry} if found</li> - * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile - * ThreadPoolProfile(s)}.</li> - * <li>if none found, then <tt>null</tt> is returned.</li> - * </ul> - * - * @param routeContext the route context - * @param name name which is appended to the thread name, when the - * {@link java.util.concurrent.ExecutorService} is created based - * on a {@link org.apache.camel.spi.ThreadPoolProfile}. - * @param source the source to use the thread pool - * @param executorServiceRef reference name of the thread pool - * @return the executor service, or <tt>null</tt> if none was found. - */ - public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name, Object source, String executorServiceRef) { - - ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); - ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); - ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); - - // lookup in registry first and use existing thread pool if exists - ExecutorService answer = routeContext.lookup(executorServiceRef, ExecutorService.class); - if (answer == null) { - // then create a thread pool assuming the ref is a thread pool - // profile id - answer = manager.newThreadPool(source, name, executorServiceRef); - } - return answer; - } - - /** - * Will lookup and get the configured - * {@link java.util.concurrent.ExecutorService} from the given definition. - * <p/> - * This method will lookup for configured thread pool in the following order - * <ul> - * <li>from the definition if any explicit configured executor service.</li> - * <li>from the {@link org.apache.camel.spi.Registry} if found</li> - * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile - * ThreadPoolProfile(s)}.</li> - * <li>if none found, then <tt>null</tt> is returned.</li> - * </ul> - * The various {@link ExecutorServiceAwareDefinition} should use this helper - * method to ensure they support configured executor services in the same - * coherent way. - * - * @param routeContext the route context - * @param name name which is appended to the thread name, when the - * {@link java.util.concurrent.ExecutorService} is created based - * on a {@link org.apache.camel.spi.ThreadPoolProfile}. - * @param definition the node definition which may leverage executor - * service. - * @param useDefault whether to fallback and use a default thread pool, if - * no explicit configured - * @return the configured executor service, or <tt>null</tt> if none was - * configured. - * @throws IllegalArgumentException is thrown if lookup of executor service - * in {@link org.apache.camel.spi.Registry} was not found - */ - public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) - throws IllegalArgumentException { - ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); - ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); - - // prefer to use explicit configured executor on the definition - if (definition.getExecutorService() != null) { - return definition.getExecutorService(); - } else if (definition.getExecutorServiceRef() != null) { - // lookup in registry first and use existing thread pool if exists - ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); - if (answer == null) { - throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() - + " not found in registry (as an ExecutorService instance) or as a thread pool profile."); - } - return answer; - } else if (useDefault) { - return manager.newDefaultThreadPool(definition, name); - } - - return null; - } - - /** - * Will lookup in {@link org.apache.camel.spi.Registry} for a - * {@link ScheduledExecutorService} registered with the given - * <tt>executorServiceRef</tt> name. - * <p/> - * This method will lookup for configured thread pool in the following order - * <ul> - * <li>from the {@link org.apache.camel.spi.Registry} if found</li> - * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile - * ThreadPoolProfile(s)}.</li> - * <li>if none found, then <tt>null</tt> is returned.</li> - * </ul> - * - * @param routeContext the route context - * @param name name which is appended to the thread name, when the - * {@link java.util.concurrent.ExecutorService} is created based - * on a {@link org.apache.camel.spi.ThreadPoolProfile}. - * @param source the source to use the thread pool - * @param executorServiceRef reference name of the thread pool - * @return the executor service, or <tt>null</tt> if none was found. - */ - public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name, Object source, String executorServiceRef) { - - ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); - ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); - ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); - - // lookup in registry first and use existing thread pool if exists - ScheduledExecutorService answer = routeContext.lookup(executorServiceRef, ScheduledExecutorService.class); - if (answer == null) { - // then create a thread pool assuming the ref is a thread pool - // profile id - answer = manager.newScheduledThreadPool(source, name, executorServiceRef); - } - return answer; - } - - /** - * Will lookup and get the configured - * {@link java.util.concurrent.ScheduledExecutorService} from the given - * definition. - * <p/> - * This method will lookup for configured thread pool in the following order - * <ul> - * <li>from the definition if any explicit configured executor service.</li> - * <li>from the {@link org.apache.camel.spi.Registry} if found</li> - * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile - * ThreadPoolProfile(s)}.</li> - * <li>if none found, then <tt>null</tt> is returned.</li> - * </ul> - * The various {@link ExecutorServiceAwareDefinition} should use this helper - * method to ensure they support configured executor services in the same - * coherent way. - * - * @param routeContext the rout context - * @param name name which is appended to the thread name, when the - * {@link java.util.concurrent.ExecutorService} is created based - * on a {@link org.apache.camel.spi.ThreadPoolProfile}. - * @param definition the node definition which may leverage executor - * service. - * @param useDefault whether to fallback and use a default thread pool, if - * no explicit configured - * @return the configured executor service, or <tt>null</tt> if none was - * configured. - * @throws IllegalArgumentException is thrown if the found instance is not a - * ScheduledExecutorService type, or lookup of executor service - * in {@link org.apache.camel.spi.Registry} was not found - */ - public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name, ExecutorServiceAwareDefinition<?> definition, - boolean useDefault) - throws IllegalArgumentException { - ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); - ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); - - // prefer to use explicit configured executor on the definition - if (definition.getExecutorService() != null) { - ExecutorService executorService = definition.getExecutorService(); - if (executorService instanceof ScheduledExecutorService) { - return (ScheduledExecutorService)executorService; - } - throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance"); - } else if (definition.getExecutorServiceRef() != null) { - ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); - if (answer == null) { - throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() - + " not found in registry (as an ScheduledExecutorService instance) or as a thread pool profile."); - } - return answer; - } else if (useDefault) { - return manager.newDefaultScheduledThreadPool(definition, name); - } - - return null; - } - - /** * The RestoreAction is used to track all the undo/restore actions that need * to be performed to undo any resolution to property placeholders that have * been applied to the camel route defs. This class is private so it does diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java index defa9af..81f129f 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.AggregationStrategy; -import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Expression; import org.apache.camel.Predicate; @@ -28,7 +27,6 @@ import org.apache.camel.Processor; import org.apache.camel.model.AggregateDefinition; import org.apache.camel.model.OptimisticLockRetryPolicyDefinition; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.aggregate.AggregateController; import org.apache.camel.processor.aggregate.AggregateProcessor; @@ -36,7 +34,6 @@ import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.concurrent.SynchronousExecutorService; public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { @@ -61,8 +58,8 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { AggregationStrategy strategy = createAggregationStrategy(routeContext); boolean parallel = parseBoolean(definition.getParallelProcessing()); - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, parallel); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", definition, parallel); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, parallel); + ExecutorService threadPool = getConfiguredExecutorService("Aggregator", definition, parallel); if (threadPool == null && !parallel) { // executor service is mandatory for the Aggregator // we do not run in parallel mode, but use a synchronous executor, @@ -169,7 +166,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { } if (definition.getOptimisticLockRetryPolicy() == null) { if (definition.getOptimisticLockRetryPolicyDefinition() != null) { - answer.setOptimisticLockRetryPolicy(createOptimisticLockRetryPolicy(camelContext, definition.getOptimisticLockRetryPolicyDefinition())); + answer.setOptimisticLockRetryPolicy(createOptimisticLockRetryPolicy(definition.getOptimisticLockRetryPolicyDefinition())); } } else { answer.setOptimisticLockRetryPolicy(definition.getOptimisticLockRetryPolicy()); @@ -183,22 +180,22 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { return answer; } - public static OptimisticLockRetryPolicy createOptimisticLockRetryPolicy(CamelContext camelContext, OptimisticLockRetryPolicyDefinition definition) { + public OptimisticLockRetryPolicy createOptimisticLockRetryPolicy(OptimisticLockRetryPolicyDefinition definition) { OptimisticLockRetryPolicy policy = new OptimisticLockRetryPolicy(); if (definition.getMaximumRetries() != null) { - policy.setMaximumRetries(CamelContextHelper.parseInt(camelContext, definition.getMaximumRetries())); + policy.setMaximumRetries(parseInt(definition.getMaximumRetries())); } if (definition.getRetryDelay() != null) { - policy.setRetryDelay(CamelContextHelper.parseLong(camelContext, definition.getRetryDelay())); + policy.setRetryDelay(parseLong(definition.getRetryDelay())); } if (definition.getMaximumRetryDelay() != null) { - policy.setMaximumRetryDelay(CamelContextHelper.parseLong(camelContext, definition.getMaximumRetryDelay())); + policy.setMaximumRetryDelay(parseLong(definition.getMaximumRetryDelay())); } if (definition.getExponentialBackOff() != null) { - policy.setExponentialBackOff(CamelContextHelper.parseBoolean(camelContext, definition.getExponentialBackOff())); + policy.setExponentialBackOff(parseBoolean(definition.getExponentialBackOff())); } if (definition.getRandomBackOff() != null) { - policy.setRandomBackOff(CamelContextHelper.parseBoolean(camelContext, definition.getRandomBackOff())); + policy.setRandomBackOff(parseBoolean(definition.getRandomBackOff())); } return policy; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DelayReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DelayReifier.java index 044069b..69bcd4f 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DelayReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DelayReifier.java @@ -22,7 +22,6 @@ import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.DelayDefinition; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.Delayer; import org.apache.camel.spi.RouteContext; @@ -39,8 +38,8 @@ public class DelayReifier extends ExpressionReifier<DelayDefinition> { Expression delay = createAbsoluteTimeDelayExpression(routeContext); boolean async = definition.getAsyncDelayed() == null || Boolean.parseBoolean(definition.getAsyncDelayed()); - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, async); - ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", definition, async); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, async); + ScheduledExecutorService threadPool = getConfiguredScheduledExecutorService("Delay", definition, async); Delayer answer = new Delayer(camelContext, childProcessor, delay, threadPool, shutdownThreadPool); answer.setAsyncDelayed(async); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java index 0ed74e3..72c3d03 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoadBalanceReifier.java @@ -58,7 +58,7 @@ public class LoadBalanceReifier extends ProcessorReifier<LoadBalanceDefinition> throw new IllegalArgumentException("Loadbalancer already configured to: " + definition.getLoadBalancerType() + ". Cannot set it to: " + processorType); } Processor processor = createProcessor(processorType); - Channel channel = wrapChannel(routeContext, processor, processorType); + Channel channel = wrapChannel(processor, processorType); loadBalancer.addProcessor(channel); } } @@ -71,7 +71,7 @@ public class LoadBalanceReifier extends ProcessorReifier<LoadBalanceDefinition> // handler can react afterwards inherit = true; } - Processor target = wrapChannel(routeContext, loadBalancer, definition, inherit); + Processor target = wrapChannel(loadBalancer, definition, inherit); return target; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java index 71a7b4c..e66b2dd 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java @@ -25,7 +25,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Processor; import org.apache.camel.model.MulticastDefinition; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; @@ -64,8 +63,8 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { boolean isParallelAggregate = definition.getParallelAggregate() != null && parseBoolean(definition.getParallelAggregate()); boolean isStopOnAggregateException = definition.getStopOnAggregateException() != null && parseBoolean(definition.getStopOnAggregateException()); - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, isParallelProcessing); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", definition, isParallelProcessing); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, isParallelProcessing); + ExecutorService threadPool = getConfiguredExecutorService("Multicast", definition, isParallelProcessing); long timeout = definition.getTimeout() != null ? parseLong(definition.getTimeout()) : 0; if (timeout > 0 && !isParallelProcessing) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java index bc71d5b..eb47895 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java @@ -23,7 +23,6 @@ import org.apache.camel.Processor; import org.apache.camel.model.OnCompletionDefinition; import org.apache.camel.model.OnCompletionMode; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.OnCompletionProcessor; import org.apache.camel.spi.RouteContext; @@ -76,8 +75,8 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition when = definition.getOnWhen().getExpression().createPredicate(routeContext); } - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, isParallelProcessing); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", definition, isParallelProcessing); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, isParallelProcessing); + ExecutorService threadPool = getConfiguredExecutorService("OnCompletion", definition, isParallelProcessing); // should be after consumer by default boolean afterConsumer = definition.getMode() == null || definition.getMode() == OnCompletionMode.AfterConsumer; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java index 57a3425..5f94c72 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnExceptionReifier.java @@ -38,7 +38,7 @@ public class OnExceptionReifier extends ProcessorReifier<OnExceptionDefinition> } @Override - public void addRoutes(RouteContext routeContext) throws Exception { + public void addRoutes() throws Exception { // assign whether this was a route scoped onException or not // we need to know this later when setting the parent, as only route // scoped should have parent diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index d7627ed..0e0bd5c 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import org.apache.camel.CamelContext; @@ -38,6 +40,7 @@ import org.apache.camel.model.ConvertBodyDefinition; import org.apache.camel.model.DelayDefinition; import org.apache.camel.model.DynamicRouterDefinition; import org.apache.camel.model.EnrichDefinition; +import org.apache.camel.model.ExecutorServiceAwareDefinition; import org.apache.camel.model.ExpressionNode; import org.apache.camel.model.FilterDefinition; import org.apache.camel.model.FinallyDefinition; @@ -103,11 +106,13 @@ import org.apache.camel.processor.InterceptEndpointProcessor; import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.channel.DefaultChannel; import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +217,253 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends } /** + * Determines whether a new thread pool will be created or not. + * <p/> + * This is used to know if a new thread pool will be created, and therefore + * is not shared by others, and therefore exclusive to the definition. + * + * @param definition the node definition which may leverage executor + * service. + * @param useDefault whether to fallback and use a default thread pool, if + * no explicit configured + * @return <tt>true</tt> if a new thread pool will be created, + * <tt>false</tt> if not + * @see #getConfiguredExecutorService(String, ExecutorServiceAwareDefinition, boolean) + */ + public boolean willCreateNewThreadPool(ExecutorServiceAwareDefinition<?> definition, boolean useDefault) { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ObjectHelper.notNull(manager, "ExecutorServiceManager", camelContext); + + if (definition.getExecutorService() != null) { + // no there is a custom thread pool configured + return false; + } else if (definition.getExecutorServiceRef() != null) { + ExecutorService answer = routeContext.lookup(definition.getExecutorServiceRef(), ExecutorService.class); + // if no existing thread pool, then we will have to create a new + // thread pool + return answer == null; + } else if (useDefault) { + return true; + } + + return false; + } + + /** + * Will lookup and get the configured + * {@link ExecutorService} from the given definition. + * <p/> + * This method will lookup for configured thread pool in the following order + * <ul> + * <li>from the definition if any explicit configured executor service.</li> + * <li>from the {@link org.apache.camel.spi.Registry} if found</li> + * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile + * ThreadPoolProfile(s)}.</li> + * <li>if none found, then <tt>null</tt> is returned.</li> + * </ul> + * The various {@link ExecutorServiceAwareDefinition} should use this helper + * method to ensure they support configured executor services in the same + * coherent way. + * + * @param name name which is appended to the thread name, when the + * {@link ExecutorService} is created based + * on a {@link org.apache.camel.spi.ThreadPoolProfile}. + * @param definition the node definition which may leverage executor + * service. + * @param useDefault whether to fallback and use a default thread pool, if + * no explicit configured + * @return the configured executor service, or <tt>null</tt> if none was + * configured. + * @throws IllegalArgumentException is thrown if lookup of executor service + * in {@link org.apache.camel.spi.Registry} was not found + */ + public ExecutorService getConfiguredExecutorService(String name, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) + throws IllegalArgumentException { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ObjectHelper.notNull(manager, "ExecutorServiceManager", camelContext); + + // prefer to use explicit configured executor on the definition + if (definition.getExecutorService() != null) { + return definition.getExecutorService(); + } else if (definition.getExecutorServiceRef() != null) { + // lookup in registry first and use existing thread pool if exists + ExecutorService answer = lookupExecutorServiceRef(name, definition, definition.getExecutorServiceRef()); + if (answer == null) { + throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + + " not found in registry (as an ExecutorService instance) or as a thread pool profile."); + } + return answer; + } else if (useDefault) { + return manager.newDefaultThreadPool(definition, name); + } + + return null; + } + + /** + * Will lookup and get the configured + * {@link java.util.concurrent.ScheduledExecutorService} from the given + * definition. + * <p/> + * This method will lookup for configured thread pool in the following order + * <ul> + * <li>from the definition if any explicit configured executor service.</li> + * <li>from the {@link org.apache.camel.spi.Registry} if found</li> + * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile + * ThreadPoolProfile(s)}.</li> + * <li>if none found, then <tt>null</tt> is returned.</li> + * </ul> + * The various {@link ExecutorServiceAwareDefinition} should use this helper + * method to ensure they support configured executor services in the same + * coherent way. + * + * @param name name which is appended to the thread name, when the + * {@link ExecutorService} is created based + * on a {@link org.apache.camel.spi.ThreadPoolProfile}. + * @param definition the node definition which may leverage executor + * service. + * @param useDefault whether to fallback and use a default thread pool, if + * no explicit configured + * @return the configured executor service, or <tt>null</tt> if none was + * configured. + * @throws IllegalArgumentException is thrown if the found instance is not a + * ScheduledExecutorService type, or lookup of executor service + * in {@link org.apache.camel.spi.Registry} was not found + */ + public ScheduledExecutorService getConfiguredScheduledExecutorService(String name, ExecutorServiceAwareDefinition<?> definition, + boolean useDefault) + throws IllegalArgumentException { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ObjectHelper.notNull(manager, "ExecutorServiceManager", camelContext); + + // prefer to use explicit configured executor on the definition + if (definition.getExecutorService() != null) { + ExecutorService executorService = definition.getExecutorService(); + if (executorService instanceof ScheduledExecutorService) { + return (ScheduledExecutorService)executorService; + } + throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance"); + } else if (definition.getExecutorServiceRef() != null) { + ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(name, definition, definition.getExecutorServiceRef()); + if (answer == null) { + throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + + " not found in registry (as an ScheduledExecutorService instance) or as a thread pool profile."); + } + return answer; + } else if (useDefault) { + return manager.newDefaultScheduledThreadPool(definition, name); + } + + return null; + } + + /** + * Will lookup in {@link org.apache.camel.spi.Registry} for a + * {@link ScheduledExecutorService} registered with the given + * <tt>executorServiceRef</tt> name. + * <p/> + * This method will lookup for configured thread pool in the following order + * <ul> + * <li>from the {@link org.apache.camel.spi.Registry} if found</li> + * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile + * ThreadPoolProfile(s)}.</li> + * <li>if none found, then <tt>null</tt> is returned.</li> + * </ul> + * + * @param routeContext the route context + * @param name name which is appended to the thread name, when the + * {@link ExecutorService} is created based + * on a {@link org.apache.camel.spi.ThreadPoolProfile}. + * @param source the source to use the thread pool + * @param executorServiceRef reference name of the thread pool + * @return the executor service, or <tt>null</tt> if none was found. + */ + public ScheduledExecutorService lookupScheduledExecutorServiceRef(String name, Object source, String executorServiceRef) { + + ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); + ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); + ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); + + // lookup in registry first and use existing thread pool if exists + ScheduledExecutorService answer = routeContext.lookup(executorServiceRef, ScheduledExecutorService.class); + if (answer == null) { + // then create a thread pool assuming the ref is a thread pool + // profile id + answer = manager.newScheduledThreadPool(source, name, executorServiceRef); + } + return answer; + } + + /** + * Will lookup in {@link org.apache.camel.spi.Registry} for a + * {@link ExecutorService} registered with the given + * <tt>executorServiceRef</tt> name. + * <p/> + * This method will lookup for configured thread pool in the following order + * <ul> + * <li>from the {@link org.apache.camel.spi.Registry} if found</li> + * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile + * ThreadPoolProfile(s)}.</li> + * <li>if none found, then <tt>null</tt> is returned.</li> + * </ul> + * + * @param name name which is appended to the thread name, when the + * {@link ExecutorService} is created based + * on a {@link org.apache.camel.spi.ThreadPoolProfile}. + * @param source the source to use the thread pool + * @param executorServiceRef reference name of the thread pool + * @return the executor service, or <tt>null</tt> if none was found. + */ + public ExecutorService lookupExecutorServiceRef(String name, Object source, String executorServiceRef) { + + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ObjectHelper.notNull(manager, "ExecutorServiceManager", camelContext); + ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); + + // lookup in registry first and use existing thread pool if exists + ExecutorService answer = routeContext.lookup(executorServiceRef, ExecutorService.class); + if (answer == null) { + // then create a thread pool assuming the ref is a thread pool + // profile id + answer = manager.newThreadPool(source, name, executorServiceRef); + } + return answer; + } + + /** + * Is there any outputs in the given list. + * <p/> + * Is used for check if the route output has any real outputs (non + * abstracts) + * + * @param outputs the outputs + * @param excludeAbstract whether or not to exclude abstract outputs (e.g. + * skip onException etc.) + * @return <tt>true</tt> if has outputs, otherwise <tt>false</tt> is + * returned + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public boolean hasOutputs(List<ProcessorDefinition<?>> outputs, boolean excludeAbstract) { + if (outputs == null || outputs.isEmpty()) { + return false; + } + if (!excludeAbstract) { + return true; + } + for (ProcessorDefinition output : outputs) { + if (output.isWrappingEntireOutput()) { + // special for those as they wrap entire output, so we should + // just check its output + return hasOutputs(output.getOutputs(), excludeAbstract); + } + if (!output.isAbstract()) { + return true; + } + } + return false; + } + + /** * Override this in definition class and implement logic to create the * processor based on the definition model. */ @@ -253,7 +505,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends return children; } - public void addRoutes(RouteContext routeContext) throws Exception { + public void addRoutes() throws Exception { Channel processor = makeProcessor(); if (processor == null) { // no processor to add @@ -280,19 +532,19 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends * Wraps the child processor in whatever necessary interceptors and error * handlers */ - public Channel wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { + public Channel wrapProcessor(Processor processor) throws Exception { // don't double wrap if (processor instanceof Channel) { return (Channel)processor; } - return wrapChannel(routeContext, processor, null); + return wrapChannel(processor, null); } - protected Channel wrapChannel(RouteContext routeContext, Processor processor, ProcessorDefinition<?> child) throws Exception { - return wrapChannel(routeContext, processor, child, definition.isInheritErrorHandler()); + protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child) throws Exception { + return wrapChannel(processor, child, definition.isInheritErrorHandler()); } - protected Channel wrapChannel(RouteContext routeContext, Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler) throws Exception { + protected Channel wrapChannel(Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler) throws Exception { // put a channel in between this and each output to control the route // flow logic DefaultChannel channel = new DefaultChannel(camelContext); @@ -300,9 +552,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends // add interceptor strategies to the channel must be in this order: // camel context, route context, local List<InterceptStrategy> interceptors = new ArrayList<>(); - addInterceptStrategies(routeContext, interceptors, camelContext.adapt(ExtendedCamelContext.class).getInterceptStrategies()); - addInterceptStrategies(routeContext, interceptors, routeContext.getInterceptStrategies()); - addInterceptStrategies(routeContext, interceptors, definition.getInterceptStrategies()); + addInterceptStrategies(interceptors, camelContext.adapt(ExtendedCamelContext.class).getInterceptStrategies()); + addInterceptStrategies(interceptors, routeContext.getInterceptStrategies()); + addInterceptStrategies(interceptors, definition.getInterceptStrategies()); // force the creation of an id RouteDefinitionHelper.forceAssignIds(camelContext, definition); @@ -385,7 +637,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends wrap = true; } if (wrap) { - wrapChannelInErrorHandler(channel, routeContext, inheritErrorHandler); + wrapChannelInErrorHandler(channel, inheritErrorHandler); } // do post init at the end @@ -399,15 +651,14 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends * Wraps the given channel in error handler (if error handler is inherited) * * @param channel the channel - * @param routeContext the route context * @param inheritErrorHandler whether to inherit error handler * @throws Exception can be thrown if failed to create error handler builder */ - private void wrapChannelInErrorHandler(DefaultChannel channel, RouteContext routeContext, Boolean inheritErrorHandler) throws Exception { + private void wrapChannelInErrorHandler(DefaultChannel channel, Boolean inheritErrorHandler) throws Exception { if (inheritErrorHandler == null || inheritErrorHandler) { log.trace("{} is configured to inheritErrorHandler", definition); Processor output = channel.getOutput(); - Processor errorHandler = wrapInErrorHandler(routeContext, output); + Processor errorHandler = wrapInErrorHandler(output); // set error handler on channel channel.setErrorHandler(errorHandler); } else { @@ -418,12 +669,11 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends /** * Wraps the given output in an error handler * - * @param routeContext the route context * @param output the output * @return the output wrapped with the error handler * @throws Exception can be thrown if failed to create error handler builder */ - protected Processor wrapInErrorHandler(RouteContext routeContext, Processor output) throws Exception { + protected Processor wrapInErrorHandler(Processor output) throws Exception { ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory(); // create error handler Processor errorHandler = ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(output); @@ -439,11 +689,10 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends /** * Adds the given list of interceptors to the channel. * - * @param routeContext the route context * @param interceptors the list to add strategies * @param strategies list of strategies to add. */ - protected void addInterceptStrategies(RouteContext routeContext, List<InterceptStrategy> interceptors, List<InterceptStrategy> strategies) { + protected void addInterceptStrategies(List<InterceptStrategy> interceptors, List<InterceptStrategy> strategies) { interceptors.addAll(strategies); } @@ -503,7 +752,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends continue; } - Processor channel = wrapChannel(routeContext, processor, output); + Processor channel = wrapChannel(processor, output); list.add(channel); } @@ -594,7 +843,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends // no processor to make return null; } - return wrapProcessor(routeContext, processor); + return wrapProcessor(processor); } /** diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java index 950815b..68cc91c 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java @@ -25,7 +25,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.RecipientListDefinition; import org.apache.camel.processor.EvaluateExpressionProcessor; import org.apache.camel.processor.Pipeline; @@ -81,8 +80,8 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti answer.setTimeout(parseLong(definition.getTimeout())); } - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, isParallelProcessing); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", definition, isParallelProcessing); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, isParallelProcessing); + ExecutorService threadPool = getConfiguredExecutorService("RecipientList", definition, isParallelProcessing); answer.setExecutorService(threadPool); answer.setShutdownExecutorService(shutdownThreadPool); long timeout = definition.getTimeout() != null ? parseLong(definition.getTimeout()) : 0; @@ -102,7 +101,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti // special error handling // when sending to the recipients individually Processor evalProcessor = new EvaluateExpressionProcessor(expression); - evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor); + evalProcessor = super.wrapInErrorHandler(evalProcessor); pipe.add(evalProcessor); pipe.add(answer); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java index 04b972b..cd20c51 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java @@ -34,7 +34,6 @@ import org.apache.camel.builder.EndpointConsumerBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.Model; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.PropertyDefinition; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.RoutesDefinition; @@ -374,7 +373,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { } // validate route has output processors - if (!ProcessorDefinitionHelper.hasOutputs(definition.getOutputs(), true)) { + if (!hasOutputs(definition.getOutputs(), true)) { String at = definition.getInput().toString(); Exception cause = new IllegalArgumentException("Route " + definition.getId() + " has no output processors." + " You need to add outputs to the route such as to(\"log:foo\")."); @@ -384,7 +383,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { List<ProcessorDefinition<?>> list = new ArrayList<>(definition.getOutputs()); for (ProcessorDefinition<?> output : list) { try { - ProcessorReifier.reifier(routeContext, output).addRoutes(routeContext); + ProcessorReifier.reifier(routeContext, output).addRoutes(); } catch (Exception e) { throw new FailedToCreateRouteException(definition.getId(), definition.toString(), output.toString(), e); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java index 8d1bda4..d678766 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java @@ -23,7 +23,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.SplitDefinition; import org.apache.camel.processor.Splitter; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; @@ -47,8 +46,8 @@ public class SplitReifier extends ExpressionReifier<SplitDefinition> { boolean isShareUnitOfWork = definition.getShareUnitOfWork() != null && definition.getShareUnitOfWork(); boolean isParallelAggregate = definition.getParallelAggregate() != null && definition.getParallelAggregate(); boolean isStopOnAggregateException = definition.getStopOnAggregateException() != null && definition.getStopOnAggregateException(); - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, isParallelProcessing); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", definition, isParallelProcessing); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, isParallelProcessing); + ExecutorService threadPool = getConfiguredExecutorService("Split", definition, isParallelProcessing); long timeout = definition.getTimeout() != null ? definition.getTimeout() : 0; if (timeout > 0 && !isParallelProcessing) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThreadsReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThreadsReifier.java index f66a43f..8137a12 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThreadsReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThreadsReifier.java @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Processor; import org.apache.camel.builder.ThreadPoolProfileBuilder; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.ThreadsDefinition; import org.apache.camel.processor.ThreadsProcessor; import org.apache.camel.spi.ExecutorServiceManager; @@ -41,8 +40,8 @@ public class ThreadsReifier extends ProcessorReifier<ThreadsDefinition> { // the threads name String name = definition.getThreadName() != null ? definition.getThreadName() : "Threads"; // prefer any explicit configured executor service - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, true); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, definition, false); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, true); + ExecutorService threadPool = getConfiguredExecutorService(name, definition, false); // resolve what rejected policy to use ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrottleReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrottleReifier.java index fb7f853..f4c3dc6 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrottleReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrottleReifier.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.ThrottleDefinition; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.Throttler; @@ -36,8 +35,8 @@ public class ThrottleReifier extends ExpressionReifier<ThrottleDefinition> { @Override public Processor createProcessor() throws Exception { boolean async = definition.getAsyncDelayed() != null && definition.getAsyncDelayed(); - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, true); - ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", definition, true); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, true); + ScheduledExecutorService threadPool = getConfiguredScheduledExecutorService("Throttle", definition, true); // should be default 1000 millis long period = definition.getTimePeriodMillis() != null ? definition.getTimePeriodMillis() : 1000L; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 267eeed..b720c1a 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -23,7 +23,6 @@ import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.SetHeaderDefinition; import org.apache.camel.model.WireTapDefinition; import org.apache.camel.processor.CamelInternalProcessor; @@ -41,8 +40,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { @Override public Processor createProcessor() throws Exception { // executor service is mandatory for wire tap - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, true); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", definition, true); + boolean shutdownThreadPool = willCreateNewThreadPool(definition, true); + ExecutorService threadPool = getConfiguredExecutorService("WireTap", definition, true); // must use InOnly for WireTap definition.setPattern(ExchangePattern.InOnly.name()); @@ -51,7 +50,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { SendDynamicProcessor dynamicTo = (SendDynamicProcessor)super.createProcessor(); // create error handler we need to use for processing the wire tapped - Processor target = wrapInErrorHandler(routeContext, dynamicTo); + Processor target = wrapInErrorHandler(dynamicTo); // and wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java index 46671c2..bb1c68d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java @@ -160,7 +160,7 @@ public class RestBindingReifier extends AbstractReifier { } if (clazz != null) { JAXBContext jc = JAXBContext.newInstance(clazz); - camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection().setProperty(camelContext, jaxb, "context", jc); + setJaxbContext(jaxb, jc); } setAdditionalConfiguration(config, jaxb, "xml.in."); @@ -171,15 +171,19 @@ public class RestBindingReifier extends AbstractReifier { } if (outClazz != null) { JAXBContext jc = JAXBContext.newInstance(outClazz); - camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection().setProperty(camelContext, outJaxb, "context", jc); + setJaxbContext(outJaxb, jc); } else if (clazz != null) { // fallback and use the context from the input JAXBContext jc = JAXBContext.newInstance(clazz); - camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection().setProperty(camelContext, outJaxb, "context", jc); + setJaxbContext(outJaxb, jc); } setAdditionalConfiguration(config, outJaxb, "xml.out."); } + private void setJaxbContext(DataFormat jaxb, JAXBContext jc) throws Exception { + camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection().setProperty(camelContext, jaxb, "context", jc); + } + private void setAdditionalConfiguration(RestConfiguration config, DataFormat dataFormat, String prefix) throws Exception { if (config.getDataFormatProperties() != null && !config.getDataFormatProperties().isEmpty()) { // must use a copy as otherwise the options gets removed during