This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 67ff8e3a7a06340f01dcd9499c30790339325a08 Author: Guillaume Nodet <[email protected]> AuthorDate: Wed Feb 19 12:29:49 2020 +0100 Move RouteContext lookup methods to AbstractReifier --- .../JtaTransactionErrorHandlerBuilder.java | 8 +- .../hystrix/processor/HystrixReifier.java | 65 ++++++------ .../component/resilience4j/ResilienceReifier.java | 29 +++--- .../spring/spi/TransactionErrorHandlerBuilder.java | 14 +-- .../spring/spi/TransactionErrorHandlerReifier.java | 15 ++- .../apache/camel/spring/EndpointReferenceTest.java | 4 +- .../java/org/apache/camel/spi/RouteContext.java | 43 -------- .../camel/impl/engine/DefaultRouteContext.java | 92 ++++------------ .../camel/builder/DeadLetterChannelBuilder.java | 8 +- ...ctedReifier.java => AbstractPolicyReifier.java} | 62 +++-------- .../org/apache/camel/reifier/AbstractReifier.java | 54 +++++++++- .../org/apache/camel/reifier/AggregateReifier.java | 12 +-- .../apache/camel/reifier/ClaimCheckReifier.java | 6 +- .../org/apache/camel/reifier/EnrichReifier.java | 2 +- .../camel/reifier/IdempotentConsumerReifier.java | 2 +- .../java/org/apache/camel/reifier/LogReifier.java | 11 +- .../org/apache/camel/reifier/MulticastReifier.java | 5 +- .../org/apache/camel/reifier/PolicyReifier.java | 12 +-- .../apache/camel/reifier/PollEnrichReifier.java | 2 +- .../org/apache/camel/reifier/ProcessReifier.java | 2 +- .../org/apache/camel/reifier/ProcessorReifier.java | 11 +- .../apache/camel/reifier/RecipientListReifier.java | 10 +- .../apache/camel/reifier/ResequenceReifier.java | 3 +- .../org/apache/camel/reifier/RouteReifier.java | 25 ++--- .../java/org/apache/camel/reifier/SagaReifier.java | 10 +- .../java/org/apache/camel/reifier/SendReifier.java | 9 +- .../java/org/apache/camel/reifier/SortReifier.java | 2 +- .../org/apache/camel/reifier/SplitReifier.java | 5 +- .../camel/reifier/ThrowExceptionReifier.java | 3 +- .../apache/camel/reifier/TransactedReifier.java | 116 ++------------------- .../org/apache/camel/reifier/WireTapReifier.java | 5 +- .../reifier/dataformat/DataFormatReifier.java | 5 +- .../errorhandler/DefaultErrorHandlerReifier.java | 2 +- .../reifier/errorhandler/ErrorHandlerReifier.java | 9 +- .../loadbalancer/CustomLoadBalancerReifier.java | 3 +- .../camel/reifier/rest/RestBindingReifier.java | 4 +- .../transformer/CustomTransformeReifier.java | 2 +- .../transformer/EndpointTransformeReifier.java | 2 +- .../reifier/validator/CustomValidatorReifier.java | 2 +- .../validator/EndpointValidatorReifier.java | 2 +- .../apache/camel/support/CamelContextHelper.java | 34 +++++- 41 files changed, 274 insertions(+), 438 deletions(-) diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java index f47928f..bb15f57 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java @@ -77,12 +77,13 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde @Override public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception { + CamelContext camelContext = routeContext.getCamelContext(); // resolve policy reference, if given if (transactionPolicy == null) { if (policyRef != null) { final TransactedDefinition transactedDefinition = new TransactedDefinition(); transactedDefinition.setRef(policyRef); - final Policy policy = TransactedReifier.resolvePolicy(routeContext, transactedDefinition); + final Policy policy = new TransactedReifier(camelContext, transactedDefinition).resolvePolicy(); if (policy != null) { if (!(policy instanceof JtaTransactionPolicy)) { throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '" @@ -99,7 +100,7 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde LOG.debug( "No transaction policy configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); - Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class); + Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class); if (mapPolicy != null && mapPolicy.size() == 1) { TransactedPolicy policy = mapPolicy.values().iterator().next(); if (policy instanceof JtaTransactionPolicy) { @@ -108,7 +109,7 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde } if (transactionPolicy == null) { - TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); + TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class); if (policy instanceof JtaTransactionPolicy) { transactionPolicy = (JtaTransactionPolicy) policy; } @@ -121,7 +122,6 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this); - final CamelContext camelContext = routeContext.getCamelContext(); final Map<String, String> properties = camelContext.getGlobalOptions(); if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) { rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY)); diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixReifier.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixReifier.java index c03b2ad..10d535a 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixReifier.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixReifier.java @@ -38,9 +38,6 @@ import org.apache.camel.spi.RouteContext; import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.util.function.Suppliers; -import static org.apache.camel.support.CamelContextHelper.lookup; -import static org.apache.camel.support.CamelContextHelper.mandatoryLookup; - public class HystrixReifier extends ProcessorReifier<CircuitBreakerDefinition> { public HystrixReifier(RouteContext routeContext, CircuitBreakerDefinition definition) { @@ -119,91 +116,91 @@ public class HystrixReifier extends ProcessorReifier<CircuitBreakerDefinition> { private void configureHystrix(HystrixCommandProperties.Setter command, HystrixThreadPoolProperties.Setter threadPool, HystrixConfigurationDefinition config) { // command if (config.getCircuitBreakerEnabled() != null) { - command.withCircuitBreakerEnabled(Boolean.parseBoolean(config.getCircuitBreakerEnabled())); + command.withCircuitBreakerEnabled(parseBoolean(config.getCircuitBreakerEnabled())); } if (config.getCircuitBreakerErrorThresholdPercentage() != null) { - command.withCircuitBreakerErrorThresholdPercentage(Integer.parseInt(config.getCircuitBreakerErrorThresholdPercentage())); + command.withCircuitBreakerErrorThresholdPercentage(parseInt(config.getCircuitBreakerErrorThresholdPercentage())); } if (config.getCircuitBreakerForceClosed() != null) { - command.withCircuitBreakerForceClosed(Boolean.parseBoolean(config.getCircuitBreakerForceClosed())); + command.withCircuitBreakerForceClosed(parseBoolean(config.getCircuitBreakerForceClosed())); } if (config.getCircuitBreakerForceOpen() != null) { - command.withCircuitBreakerForceOpen(Boolean.parseBoolean(config.getCircuitBreakerForceOpen())); + command.withCircuitBreakerForceOpen(parseBoolean(config.getCircuitBreakerForceOpen())); } if (config.getCircuitBreakerRequestVolumeThreshold() != null) { - command.withCircuitBreakerRequestVolumeThreshold(Integer.parseInt(config.getCircuitBreakerRequestVolumeThreshold())); + command.withCircuitBreakerRequestVolumeThreshold(parseInt(config.getCircuitBreakerRequestVolumeThreshold())); } if (config.getCircuitBreakerSleepWindowInMilliseconds() != null) { - command.withCircuitBreakerSleepWindowInMilliseconds(Integer.parseInt(config.getCircuitBreakerSleepWindowInMilliseconds())); + command.withCircuitBreakerSleepWindowInMilliseconds(parseInt(config.getCircuitBreakerSleepWindowInMilliseconds())); } if (config.getExecutionIsolationSemaphoreMaxConcurrentRequests() != null) { - command.withExecutionIsolationSemaphoreMaxConcurrentRequests(Integer.parseInt(config.getExecutionIsolationSemaphoreMaxConcurrentRequests())); + command.withExecutionIsolationSemaphoreMaxConcurrentRequests(parseInt(config.getExecutionIsolationSemaphoreMaxConcurrentRequests())); } if (config.getExecutionIsolationStrategy() != null) { - command.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.valueOf(config.getExecutionIsolationStrategy())); + command.withExecutionIsolationStrategy(parse(HystrixCommandProperties.ExecutionIsolationStrategy.class, config.getExecutionIsolationStrategy())); } if (config.getExecutionIsolationThreadInterruptOnTimeout() != null) { - command.withExecutionIsolationThreadInterruptOnTimeout(Boolean.parseBoolean(config.getExecutionIsolationThreadInterruptOnTimeout())); + command.withExecutionIsolationThreadInterruptOnTimeout(parseBoolean(config.getExecutionIsolationThreadInterruptOnTimeout())); } if (config.getExecutionTimeoutInMilliseconds() != null) { - command.withExecutionTimeoutInMilliseconds(Integer.parseInt(config.getExecutionTimeoutInMilliseconds())); + command.withExecutionTimeoutInMilliseconds(parseInt(config.getExecutionTimeoutInMilliseconds())); } if (config.getExecutionTimeoutEnabled() != null) { - command.withExecutionTimeoutEnabled(Boolean.parseBoolean(config.getExecutionTimeoutEnabled())); + command.withExecutionTimeoutEnabled(parseBoolean(config.getExecutionTimeoutEnabled())); } if (config.getFallbackIsolationSemaphoreMaxConcurrentRequests() != null) { - command.withFallbackIsolationSemaphoreMaxConcurrentRequests(Integer.parseInt(config.getFallbackIsolationSemaphoreMaxConcurrentRequests())); + command.withFallbackIsolationSemaphoreMaxConcurrentRequests(parseInt(config.getFallbackIsolationSemaphoreMaxConcurrentRequests())); } if (config.getFallbackEnabled() != null) { - command.withFallbackEnabled(Boolean.parseBoolean(config.getFallbackEnabled())); + command.withFallbackEnabled(parseBoolean(config.getFallbackEnabled())); } if (config.getMetricsHealthSnapshotIntervalInMilliseconds() != null) { - command.withMetricsHealthSnapshotIntervalInMilliseconds(Integer.parseInt(config.getMetricsHealthSnapshotIntervalInMilliseconds())); + command.withMetricsHealthSnapshotIntervalInMilliseconds(parseInt(config.getMetricsHealthSnapshotIntervalInMilliseconds())); } if (config.getMetricsRollingPercentileBucketSize() != null) { - command.withMetricsRollingPercentileBucketSize(Integer.parseInt(config.getMetricsRollingPercentileBucketSize())); + command.withMetricsRollingPercentileBucketSize(parseInt(config.getMetricsRollingPercentileBucketSize())); } if (config.getMetricsRollingPercentileEnabled() != null) { - command.withMetricsRollingPercentileEnabled(Boolean.parseBoolean(config.getMetricsRollingPercentileEnabled())); + command.withMetricsRollingPercentileEnabled(parseBoolean(config.getMetricsRollingPercentileEnabled())); } if (config.getMetricsRollingPercentileWindowInMilliseconds() != null) { - command.withMetricsRollingPercentileWindowInMilliseconds(Integer.parseInt(config.getMetricsRollingPercentileWindowInMilliseconds())); + command.withMetricsRollingPercentileWindowInMilliseconds(parseInt(config.getMetricsRollingPercentileWindowInMilliseconds())); } if (config.getMetricsRollingPercentileWindowBuckets() != null) { - command.withMetricsRollingPercentileWindowBuckets(Integer.parseInt(config.getMetricsRollingPercentileWindowBuckets())); + command.withMetricsRollingPercentileWindowBuckets(parseInt(config.getMetricsRollingPercentileWindowBuckets())); } if (config.getMetricsRollingStatisticalWindowInMilliseconds() != null) { - command.withMetricsRollingStatisticalWindowInMilliseconds(Integer.parseInt(config.getMetricsRollingStatisticalWindowInMilliseconds())); + command.withMetricsRollingStatisticalWindowInMilliseconds(parseInt(config.getMetricsRollingStatisticalWindowInMilliseconds())); } if (config.getMetricsRollingStatisticalWindowBuckets() != null) { - command.withMetricsRollingStatisticalWindowBuckets(Integer.parseInt(config.getMetricsRollingStatisticalWindowBuckets())); + command.withMetricsRollingStatisticalWindowBuckets(parseInt(config.getMetricsRollingStatisticalWindowBuckets())); } if (config.getRequestLogEnabled() != null) { - command.withRequestLogEnabled(Boolean.parseBoolean(config.getRequestLogEnabled())); + command.withRequestLogEnabled(parseBoolean(config.getRequestLogEnabled())); } if (config.getCorePoolSize() != null) { - threadPool.withCoreSize(Integer.parseInt(config.getCorePoolSize())); + threadPool.withCoreSize(parseInt(config.getCorePoolSize())); } if (config.getMaximumSize() != null) { - threadPool.withMaximumSize(Integer.parseInt(config.getMaximumSize())); + threadPool.withMaximumSize(parseInt(config.getMaximumSize())); } if (config.getKeepAliveTime() != null) { - threadPool.withKeepAliveTimeMinutes(Integer.parseInt(config.getKeepAliveTime())); + threadPool.withKeepAliveTimeMinutes(parseInt(config.getKeepAliveTime())); } if (config.getMaxQueueSize() != null) { - threadPool.withMaxQueueSize(Integer.parseInt(config.getMaxQueueSize())); + threadPool.withMaxQueueSize(parseInt(config.getMaxQueueSize())); } if (config.getQueueSizeRejectionThreshold() != null) { - threadPool.withQueueSizeRejectionThreshold(Integer.parseInt(config.getQueueSizeRejectionThreshold())); + threadPool.withQueueSizeRejectionThreshold(parseInt(config.getQueueSizeRejectionThreshold())); } if (config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds() != null) { - threadPool.withMetricsRollingStatisticalWindowInMilliseconds(Integer.parseInt(config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds())); + threadPool.withMetricsRollingStatisticalWindowInMilliseconds(parseInt(config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds())); } if (config.getThreadPoolRollingNumberStatisticalWindowBuckets() != null) { - threadPool.withMetricsRollingStatisticalWindowBuckets(Integer.parseInt(config.getThreadPoolRollingNumberStatisticalWindowBuckets())); + threadPool.withMetricsRollingStatisticalWindowBuckets(parseInt(config.getThreadPoolRollingNumberStatisticalWindowBuckets())); } if (config.getAllowMaximumSizeToDivergeFromCoreSize() != null) { - threadPool.withAllowMaximumSizeToDivergeFromCoreSize(Boolean.parseBoolean(config.getAllowMaximumSizeToDivergeFromCoreSize())); + threadPool.withAllowMaximumSizeToDivergeFromCoreSize(parseBoolean(config.getAllowMaximumSizeToDivergeFromCoreSize())); } } @@ -218,7 +215,7 @@ public class HystrixReifier extends ProcessorReifier<CircuitBreakerDefinition> { // camel context takes the precedence over those in the registry loadProperties(camelContext, properties, Suppliers.firstNotNull( () -> camelContext.getExtension(Model.class).getHystrixConfiguration(null), - () -> lookup(camelContext, HystrixConstants.DEFAULT_HYSTRIX_CONFIGURATION_ID, HystrixConfigurationDefinition.class)) + () -> lookup(HystrixConstants.DEFAULT_HYSTRIX_CONFIGURATION_ID, HystrixConfigurationDefinition.class)) ); // Extract properties from referenced configuration, the one configured @@ -228,7 +225,7 @@ public class HystrixReifier extends ProcessorReifier<CircuitBreakerDefinition> { loadProperties(camelContext, properties, Suppliers.firstNotNull( () -> camelContext.getExtension(Model.class).getHystrixConfiguration(ref), - () -> mandatoryLookup(camelContext, ref, HystrixConfigurationDefinition.class)) + () -> mandatoryLookup(ref, HystrixConfigurationDefinition.class)) ); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index bb7e2b1..f725db8 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -26,7 +26,6 @@ import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.timelimiter.TimeLimiterConfig; -import org.apache.camel.CamelContext; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.model.CircuitBreakerDefinition; @@ -36,13 +35,9 @@ import org.apache.camel.model.Resilience4jConfigurationDefinition; import org.apache.camel.reifier.ProcessorReifier; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.util.function.Suppliers; -import static org.apache.camel.support.CamelContextHelper.lookup; -import static org.apache.camel.support.CamelContextHelper.mandatoryLookup; - public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { public ResilienceReifier(RouteContext routeContext, CircuitBreakerDefinition definition) { @@ -61,16 +56,16 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition if (fallbackViaNetwork) { throw new UnsupportedOperationException("camel-resilience4j does not support onFallbackViaNetwork"); } - final Resilience4jConfigurationCommon config = buildResilience4jConfiguration(routeContext.getCamelContext()); + final Resilience4jConfigurationCommon config = buildResilience4jConfiguration(); CircuitBreakerConfig cbConfig = configureCircuitBreaker(config); BulkheadConfig bhConfig = configureBulkHead(config); TimeLimiterConfig tlConfig = configureTimeLimiter(config); ResilienceProcessor answer = new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback); - configureTimeoutExecutorService(answer, routeContext, config); + configureTimeoutExecutorService(answer, config); // using any existing circuit breakers? if (config.getCircuitBreakerRef() != null) { - CircuitBreaker cb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), parseString(config.getCircuitBreakerRef()), CircuitBreaker.class); + CircuitBreaker cb = mandatoryLookup(parseString(config.getCircuitBreakerRef()), CircuitBreaker.class); answer.setCircuitBreaker(cb); } return answer; @@ -141,7 +136,7 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition return builder.build(); } - private void configureTimeoutExecutorService(ResilienceProcessor processor, RouteContext routeContext, Resilience4jConfigurationCommon config) { + private void configureTimeoutExecutorService(ResilienceProcessor processor, Resilience4jConfigurationCommon config) { if (!parseBoolean(config.getTimeoutEnabled(), false)) { return; } @@ -149,7 +144,7 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition if (config.getTimeoutExecutorServiceRef() != null) { String ref = config.getTimeoutExecutorServiceRef(); boolean shutdownThreadPool = false; - ExecutorService executorService = routeContext.lookup(ref, ExecutorService.class); + ExecutorService executorService = lookup(ref, ExecutorService.class); if (executorService == null) { executorService = lookupExecutorServiceRef("CircuitBreaker", definition, ref); shutdownThreadPool = true; @@ -163,27 +158,27 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition // Helpers // ******************************* - Resilience4jConfigurationDefinition buildResilience4jConfiguration(CamelContext camelContext) throws Exception { + Resilience4jConfigurationDefinition buildResilience4jConfiguration() throws Exception { Map<String, Object> properties = new HashMap<>(); // Extract properties from default configuration, the one configured on // camel context takes the precedence over those in the registry - loadProperties(camelContext, properties, Suppliers.firstNotNull( + loadProperties(properties, Suppliers.firstNotNull( () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(null), - () -> lookup(camelContext, ResilienceConstants.DEFAULT_RESILIENCE_CONFIGURATION_ID, Resilience4jConfigurationDefinition.class))); + () -> lookup(ResilienceConstants.DEFAULT_RESILIENCE_CONFIGURATION_ID, Resilience4jConfigurationDefinition.class))); // Extract properties from referenced configuration, the one configured // on camel context takes the precedence over those in the registry if (definition.getConfigurationRef() != null) { final String ref = definition.getConfigurationRef(); - loadProperties(camelContext, properties, Suppliers.firstNotNull( + loadProperties(properties, Suppliers.firstNotNull( () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(ref), - () -> mandatoryLookup(camelContext, ref, Resilience4jConfigurationDefinition.class))); + () -> mandatoryLookup(ref, Resilience4jConfigurationDefinition.class))); } // Extract properties from local configuration - loadProperties(camelContext, properties, Optional.ofNullable(definition.getResilience4jConfiguration())); + loadProperties(properties, Optional.ofNullable(definition.getResilience4jConfiguration())); // Extract properties from definition BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection(); @@ -197,7 +192,7 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition return config; } - private void loadProperties(CamelContext camelContext, Map<String, Object> properties, Optional<?> optional) { + private void loadProperties(Map<String, Object> properties, Optional<?> optional) { BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection(); optional.ifPresent(bean -> beanIntrospection.getProperties(bean, properties, null, false)); } diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java index d527700..1c7fc85 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java @@ -18,6 +18,7 @@ package org.apache.camel.spring.spi; import java.util.Map; +import org.apache.camel.CamelContext; import org.apache.camel.LoggingLevel; import org.apache.camel.Processor; import org.apache.camel.builder.DefaultErrorHandlerBuilder; @@ -60,11 +61,12 @@ public class TransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { @Override public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { + CamelContext camelContext = routeContext.getCamelContext(); if (transactionTemplate == null) { // lookup in context if no transaction template has been configured LOG.debug("No TransactionTemplate configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); - Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class); + Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class); if (mapPolicy != null && mapPolicy.size() == 1) { TransactedPolicy policy = mapPolicy.values().iterator().next(); if (policy instanceof SpringTransactionPolicy) { @@ -73,14 +75,14 @@ public class TransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { } if (transactionTemplate == null) { - TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); + TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class); if (policy instanceof SpringTransactionPolicy) { transactionTemplate = ((SpringTransactionPolicy) policy).getTransactionTemplate(); } } if (transactionTemplate == null) { - Map<String, TransactionTemplate> mapTemplate = routeContext.lookupByType(TransactionTemplate.class); + Map<String, TransactionTemplate> mapTemplate = camelContext.getRegistry().findByTypeWithName(TransactionTemplate.class); if (mapTemplate == null || mapTemplate.isEmpty()) { LOG.trace("No TransactionTemplate found in registry."); } else if (mapTemplate.size() == 1) { @@ -92,7 +94,7 @@ public class TransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { } if (transactionTemplate == null) { - Map<String, PlatformTransactionManager> mapManager = routeContext.lookupByType(PlatformTransactionManager.class); + Map<String, PlatformTransactionManager> mapManager = camelContext.getRegistry().findByTypeWithName(PlatformTransactionManager.class); if (mapManager == null || mapManager.isEmpty()) { LOG.trace("No PlatformTransactionManager found in registry."); } else if (mapManager.size() == 1) { @@ -110,9 +112,9 @@ public class TransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { ObjectHelper.notNull(transactionTemplate, "transactionTemplate", this); - TransactionErrorHandler answer = new TransactionErrorHandler(routeContext.getCamelContext(), processor, + TransactionErrorHandler answer = new TransactionErrorHandler(camelContext, processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(), getExceptionPolicyStrategy(), transactionTemplate, - getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext()), getRollbackLoggingLevel(), getOnExceptionOccurred()); + getRetryWhilePolicy(camelContext), getExecutorService(camelContext), getRollbackLoggingLevel(), getOnExceptionOccurred()); // configure error handler before we can use it configure(routeContext, answer); return answer; diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java index 90210dc..29dc00b 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java @@ -18,7 +18,6 @@ package org.apache.camel.spring.spi; import java.util.Map; -import org.apache.camel.CamelContext; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Processor; import org.apache.camel.reifier.errorhandler.DefaultErrorHandlerReifier; @@ -47,7 +46,7 @@ public class TransactionErrorHandlerReifier extends DefaultErrorHandlerReifier<T // lookup in context if no transaction template has been configured LOG.debug("No TransactionTemplate configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); - Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class); + Map<String, TransactedPolicy> mapPolicy = findByTypeWithName(TransactedPolicy.class); if (mapPolicy != null && mapPolicy.size() == 1) { TransactedPolicy policy = mapPolicy.values().iterator().next(); if (policy instanceof SpringTransactionPolicy) { @@ -56,14 +55,14 @@ public class TransactionErrorHandlerReifier extends DefaultErrorHandlerReifier<T } if (transactionTemplate == null) { - TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); + TransactedPolicy policy = lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); if (policy instanceof SpringTransactionPolicy) { transactionTemplate = ((SpringTransactionPolicy) policy).getTransactionTemplate(); } } if (transactionTemplate == null) { - Map<String, TransactionTemplate> mapTemplate = routeContext.lookupByType(TransactionTemplate.class); + Map<String, TransactionTemplate> mapTemplate = findByTypeWithName(TransactionTemplate.class); if (mapTemplate == null || mapTemplate.isEmpty()) { LOG.trace("No TransactionTemplate found in registry."); } else if (mapTemplate.size() == 1) { @@ -75,7 +74,7 @@ public class TransactionErrorHandlerReifier extends DefaultErrorHandlerReifier<T } if (transactionTemplate == null) { - Map<String, PlatformTransactionManager> mapManager = routeContext.lookupByType(PlatformTransactionManager.class); + Map<String, PlatformTransactionManager> mapManager = findByTypeWithName(PlatformTransactionManager.class); if (mapManager == null || mapManager.isEmpty()) { LOG.trace("No PlatformTransactionManager found in registry."); } else if (mapManager.size() == 1) { @@ -93,11 +92,11 @@ public class TransactionErrorHandlerReifier extends DefaultErrorHandlerReifier<T ObjectHelper.notNull(transactionTemplate, "transactionTemplate", this); - TransactionErrorHandler answer = new TransactionErrorHandler(routeContext.getCamelContext(), processor, + TransactionErrorHandler answer = new TransactionErrorHandler(camelContext, processor, definition.getLogger(), definition.getOnRedelivery(), definition.getRedeliveryPolicy(), definition.getExceptionPolicyStrategy(), transactionTemplate, - definition.getRetryWhilePolicy(routeContext.getCamelContext()), - getExecutorService(routeContext.getCamelContext()), + definition.getRetryWhilePolicy(camelContext), + getExecutorService(camelContext), definition.getRollbackLoggingLevel(), definition.getOnExceptionOccurred()); // configure error handler before we can use it configure(routeContext, answer); diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java index fb1ef62..23c80cc 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java @@ -25,6 +25,7 @@ import org.apache.camel.impl.engine.DefaultRouteContext; import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.RouteContext; import org.apache.camel.spring.example.DummyBean; +import org.apache.camel.support.CamelContextHelper; import org.junit.Test; import org.springframework.context.support.AbstractXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -78,9 +79,8 @@ public class EndpointReferenceTest extends SpringTestSupport { CamelContext context = applicationContext.getBean("camel2", CamelContext.class); RouteDefinition route = new RouteDefinition("temporary"); String routeId = route.idOrCreate(context.adapt(ExtendedCamelContext.class).getNodeIdFactory()); - RouteContext routeContext = new DefaultRouteContext(context, route, routeId); try { - routeContext.resolveEndpoint(null, "endpoint1"); + CamelContextHelper.resolveEndpoint(context, null, "endpoint1"); fail("Should have thrown exception"); } catch (NoSuchEndpointException exception) { assertTrue("Get a wrong exception message", exception.getMessage().contains("make sure the endpoint has the same camel context as the route does")); diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java index 22aa1db..8513863 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java @@ -54,49 +54,6 @@ public interface RouteContext extends RuntimeConfiguration, EndpointAware { CamelContext getCamelContext(); /** - * Resolves an endpoint from the URI - * - * @param uri the URI - * @return the resolved endpoint - */ - Endpoint resolveEndpoint(String uri); - - /** - * Resolves an endpoint from either a URI or a named reference - * - * @param uri the URI or - * @param ref the named reference - * @return the resolved endpoint - */ - Endpoint resolveEndpoint(String uri, String ref); - - /** - * lookup an object by name and type - * - * @param name the name to lookup - * @param type the expected type - * @return the found object - */ - <T> T lookup(String name, Class<T> type); - - /** - * lookup an object by name and type or throws {@link org.apache.camel.NoSuchBeanException} if not found. - * - * @param name the name to lookup - * @param type the expected type - * @return the found object - */ - <T> T mandatoryLookup(String name, Class<T> type); - - /** - * lookup objects by type - * - * @param type the expected type - * @return the found objects with the name as the key in the map. Returns an empty map if none found. - */ - <T> Map<String, T> lookupByType(Class<T> type); - - /** * For completing the route creation, creating a single event driven route * for the current from endpoint with any processors required */ diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java index 0269c4f..0ee844b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java @@ -29,7 +29,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.NamedNode; -import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; @@ -44,7 +43,6 @@ import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RouteError; import org.apache.camel.spi.RoutePolicy; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.ObjectHelper; /** @@ -119,68 +117,16 @@ public class DefaultRouteContext implements RouteContext { } @Override - public Endpoint resolveEndpoint(String uri) { - return CamelContextHelper.getMandatoryEndpoint(camelContext, uri); - } - - @Override - public Endpoint resolveEndpoint(String uri, String ref) { - Endpoint endpoint = null; - if (uri != null) { - endpoint = camelContext.getEndpoint(uri); - if (endpoint == null) { - throw new NoSuchEndpointException(uri); - } - } - if (ref != null) { - endpoint = lookup(ref, Endpoint.class); - if (endpoint == null) { - throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); - } - // Check the endpoint has the right CamelContext - if (!this.getCamelContext().equals(endpoint.getCamelContext())) { - throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); - } - try { - // need add the endpoint into service - getCamelContext().addService(endpoint); - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - if (endpoint == null) { - throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); - } else { - return endpoint; - } - } - - @Override - public <T> T lookup(String name, Class<T> type) { - return getCamelContext().getRegistry().lookupByNameAndType(name, type); - } - - @Override - public <T> Map<String, T> lookupByType(Class<T> type) { - return getCamelContext().getRegistry().findByTypeWithName(type); - } - - @Override - public <T> T mandatoryLookup(String name, Class<T> type) { - return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); - } - - @Override public Route commit() { // now lets turn all of the event driven consumer processors into a single route if (!eventDrivenProcessors.isEmpty()) { // always use an pipeline even if there are only 1 processor as the pipeline // handles preparing the response from the exchange in regard to IN vs OUT messages etc - Processor target = new Pipeline(getCamelContext(), eventDrivenProcessors); + Processor target = new Pipeline(camelContext, eventDrivenProcessors); // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW - CamelInternalProcessor internal = new CamelInternalProcessor(getCamelContext(), target); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this, getCamelContext())); + CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this, camelContext)); // and then optionally add route policy processor if a custom policy is set List<RoutePolicy> routePolicyList = getRoutePolicyList(); @@ -291,20 +237,20 @@ public class DefaultRouteContext implements RouteContext { return trace; } else { // fallback to the option from camel context - return getCamelContext().isTracing(); + return camelContext.isTracing(); } } @Override public String getTracingPattern() { // can only set this on context level - return getCamelContext().getTracingPattern(); + return camelContext.getTracingPattern(); } @Override public void setTracingPattern(String tracePattern) { // can only set this on context level - getCamelContext().setTracingPattern(tracePattern); + camelContext.setTracingPattern(tracePattern); } @Override @@ -318,7 +264,7 @@ public class DefaultRouteContext implements RouteContext { return backlogTrace; } else { // fallback to the option from camel context - return getCamelContext().isBacklogTracing(); + return camelContext.isBacklogTracing(); } } @@ -333,7 +279,7 @@ public class DefaultRouteContext implements RouteContext { return debug; } else { // fallback to the option from camel context - return getCamelContext().isDebugging(); + return camelContext.isDebugging(); } } @@ -348,7 +294,7 @@ public class DefaultRouteContext implements RouteContext { return messageHistory; } else { // fallback to the option from camel context - return getCamelContext().isMessageHistory(); + return camelContext.isMessageHistory(); } } @@ -363,7 +309,7 @@ public class DefaultRouteContext implements RouteContext { return logMask; } else { // fallback to the option from camel context - return getCamelContext().isLogMask(); + return camelContext.isLogMask(); } } @@ -378,7 +324,7 @@ public class DefaultRouteContext implements RouteContext { return logExhaustedMessageBody; } else { // fallback to the option from camel context - return getCamelContext().isLogExhaustedMessageBody(); + return camelContext.isLogExhaustedMessageBody(); } } @@ -393,7 +339,7 @@ public class DefaultRouteContext implements RouteContext { return streamCache; } else { // fallback to the option from camel context - return getCamelContext().isStreamCaching(); + return camelContext.isStreamCaching(); } } @@ -408,7 +354,7 @@ public class DefaultRouteContext implements RouteContext { return delay; } else { // fallback to the option from camel context - return getCamelContext().getDelayer(); + return camelContext.getDelayer(); } } @@ -454,25 +400,25 @@ public class DefaultRouteContext implements RouteContext { @Override public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) { // can only be configured on CamelContext - getCamelContext().setAllowUseOriginalMessage(allowUseOriginalMessage); + camelContext.setAllowUseOriginalMessage(allowUseOriginalMessage); } @Override public Boolean isAllowUseOriginalMessage() { // can only be configured on CamelContext - return getCamelContext().isAllowUseOriginalMessage(); + return camelContext.isAllowUseOriginalMessage(); } @Override public Boolean isCaseInsensitiveHeaders() { // can only be configured on CamelContext - return getCamelContext().isCaseInsensitiveHeaders(); + return camelContext.isCaseInsensitiveHeaders(); } @Override public void setCaseInsensitiveHeaders(Boolean caseInsensitiveHeaders) { // can only be configured on CamelContext - getCamelContext().setCaseInsensitiveHeaders(caseInsensitiveHeaders); + camelContext.setCaseInsensitiveHeaders(caseInsensitiveHeaders); } @Override @@ -481,7 +427,7 @@ public class DefaultRouteContext implements RouteContext { return shutdownRoute; } else { // fallback to the option from camel context - return getCamelContext().getShutdownRoute(); + return camelContext.getShutdownRoute(); } } @@ -496,7 +442,7 @@ public class DefaultRouteContext implements RouteContext { return shutdownRunningTask; } else { // fallback to the option from camel context - return getCamelContext().getShutdownRunningTask(); + return camelContext.getShutdownRunningTask(); } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java index 4d1b3f5..3fdd549 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java @@ -16,6 +16,7 @@ */ package org.apache.camel.builder; +import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.LoggingLevel; @@ -54,10 +55,11 @@ public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder { public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { validateDeadLetterUri(routeContext); - DeadLetterChannel answer = new DeadLetterChannel(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(), + CamelContext camelContext = routeContext.getCamelContext(); + DeadLetterChannel answer = new DeadLetterChannel(camelContext, processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(), getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(), isDeadLetterHandleNewException(), - isUseOriginalMessage(), isUseOriginalBody(), getRetryWhilePolicy(routeContext.getCamelContext()), - getExecutorService(routeContext.getCamelContext()), getOnPrepareFailure(), getOnExceptionOccurred()); + isUseOriginalMessage(), isUseOriginalBody(), getRetryWhilePolicy(camelContext), + getExecutorService(camelContext), getOnPrepareFailure(), getOnExceptionOccurred()); // configure error handler before we can use it configure(routeContext, answer); return answer; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractPolicyReifier.java similarity index 67% copy from core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java copy to core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractPolicyReifier.java index 68b129c..8f111b4 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractPolicyReifier.java @@ -19,67 +19,37 @@ package org.apache.camel.reifier; import java.lang.reflect.Method; import java.util.Map; +import org.apache.camel.CamelContext; import org.apache.camel.NoSuchBeanException; -import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.Service; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.TransactedDefinition; -import org.apache.camel.processor.WrapProcessor; import org.apache.camel.spi.Policy; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.TransactedPolicy; -import org.apache.camel.support.CamelContextHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.camel.model.TransactedDefinition.PROPAGATION_REQUIRED; -public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { +public abstract class AbstractPolicyReifier<T extends ProcessorDefinition<?>> extends ProcessorReifier<T> { private static final Logger LOG = LoggerFactory.getLogger(TransactedReifier.class); - public TransactedReifier(RouteContext routeContext, ProcessorDefinition<?> definition) { - super(routeContext, (TransactedDefinition) definition); + public AbstractPolicyReifier(RouteContext routeContext, T definition) { + super(routeContext, definition); } - @Override - public Processor createProcessor() throws Exception { - Policy policy = resolvePolicy(routeContext); - org.apache.camel.util.ObjectHelper.notNull(policy, "policy", this); - - // before wrap - policy.beforeWrap(routeContext, definition); - - // create processor after the before wrap - Processor childProcessor = this.createChildProcessor(true); - - // wrap - Processor target = policy.wrap(routeContext, childProcessor); - - if (!(target instanceof Service)) { - // wrap the target so it becomes a service and we can manage its - // lifecycle - target = new WrapProcessor(target, childProcessor); - } - return target; + public AbstractPolicyReifier(CamelContext camelContext, T definition) { + super(camelContext, definition); } - protected Policy resolvePolicy(RouteContext routeContext) { - return resolvePolicy(routeContext, definition); - } - - public static Policy resolvePolicy(RouteContext routeContext, TransactedDefinition definition) { - if (definition.getPolicy() != null) { - return definition.getPolicy(); + public Policy resolvePolicy(Policy policy, String ref, Class<? extends Policy> type) { + if (policy != null) { + return policy; } - return resolvePolicy(routeContext, definition.getRef(), definition.getType()); - } - - public static Policy resolvePolicy(RouteContext routeContext, String ref, Class<? extends Policy> type) { // explicit ref given so lookup by it if (org.apache.camel.util.ObjectHelper.isNotEmpty(ref)) { - return CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, Policy.class); + return mandatoryLookup(ref, Policy.class); } // no explicit reference given from user so we can use some convention @@ -90,7 +60,7 @@ public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { if (type != null) { // try find by type, note that this method is not supported by all // registry - Map<String, ?> types = routeContext.lookupByType(type); + Map<String, ?> types = findByTypeWithName(type); if (types.size() == 1) { // only one policy defined so use it Object found = types.values().iterator().next(); @@ -103,18 +73,18 @@ public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { // for transacted routing try the default REQUIRED name if (type == TransactedPolicy.class) { // still not found try with the default name PROPAGATION_REQUIRED - answer = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); + answer = lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); } // this logic only applies if we are a transacted policy // still no policy found then try lookup the platform transaction // manager and use it as policy if (answer == null && type == TransactedPolicy.class) { - Class<?> tmClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.springframework.transaction.PlatformTransactionManager"); + Class<?> tmClazz = camelContext.getClassResolver().resolveClass("org.springframework.transaction.PlatformTransactionManager"); if (tmClazz != null) { // see if we can find the platform transaction manager in the // registry - Map<String, ?> maps = routeContext.lookupByType(tmClazz); + Map<String, ?> maps = findByTypeWithName(tmClazz); if (maps.size() == 1) { // only one platform manager then use it as default and // create a transacted @@ -130,7 +100,7 @@ public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { // route building Object transactionManager = maps.values().iterator().next(); LOG.debug("One instance of PlatformTransactionManager found in registry: {}", transactionManager); - Class<?> txClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.apache.camel.spring.spi.SpringTransactionPolicy"); + Class<?> txClazz = camelContext.getClassResolver().resolveClass("org.apache.camel.spring.spi.SpringTransactionPolicy"); if (txClazz != null) { LOG.debug("Creating a new temporary SpringTransactionPolicy using the PlatformTransactionManager: {}", transactionManager); TransactedPolicy txPolicy = org.apache.camel.support.ObjectHelper.newInstance(txClazz, TransactedPolicy.class); @@ -151,7 +121,7 @@ public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { throw new NoSuchBeanException(null, "PlatformTransactionManager"); } else { throw new IllegalArgumentException("Found " + maps.size() + " PlatformTransactionManager in registry. " - + "Cannot determine which one to use. Please configure a TransactionTemplate on the transacted policy."); + + "Cannot determine which one to use. Please configure a TransactionTemplate on the transacted policy."); } } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractReifier.java index 28cbbaa..8e4d9b8 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AbstractReifier.java @@ -16,28 +16,36 @@ */ package org.apache.camel.reifier; +import java.util.Map; +import java.util.Set; + import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Expression; +import org.apache.camel.NoSuchBeanException; +import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Predicate; import org.apache.camel.model.ExpressionSubElementDefinition; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.reifier.language.ExpressionReifier; +import org.apache.camel.spi.BeanRepository; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.util.ObjectHelper; -public abstract class AbstractReifier { +public abstract class AbstractReifier implements BeanRepository { protected final RouteContext routeContext; protected final CamelContext camelContext; public AbstractReifier(RouteContext routeContext) { - this.routeContext = routeContext; + this.routeContext = ObjectHelper.notNull(routeContext, "RouteContext"); this.camelContext = routeContext.getCamelContext(); } public AbstractReifier(CamelContext camelContext) { this.routeContext = null; - this.camelContext = camelContext; + this.camelContext = ObjectHelper.notNull(camelContext, "CamelContext"); } protected String parseString(String text) { @@ -115,4 +123,44 @@ public abstract class AbstractReifier { return s != null ? s.startsWith("#") ? s : "#" + s : null; } + protected BeanRepository getRegistry() { + return camelContext.getRegistry(); + } + + public <T> T mandatoryLookup(String name, Class<T> beanType) { + return CamelContextHelper.mandatoryLookup(camelContext, name, beanType); + } + + @Override + public Object lookupByName(String name) { + return getRegistry().lookupByName(name); + } + + public <T> T lookup(String name, Class<T> type) { + return lookupByNameAndType(name, type); + } + + public <T> T lookupByNameAndType(String name, Class<T> type) { + return getRegistry().lookupByNameAndType(name, type); + } + + @Override + public <T> Map<String, T> findByTypeWithName(Class<T> type) { + return getRegistry().findByTypeWithName(type); + } + + @Override + public <T> Set<T> findByType(Class<T> type) { + return getRegistry().findByType(type); + } + + @Override + public Object unwrap(Object value) { + return getRegistry().unwrap(value); + } + + public Endpoint resolveEndpoint(String uri) throws NoSuchEndpointException { + return CamelContextHelper.getMandatoryEndpoint(camelContext, uri); + } + } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java index 2703d4d..5f60275 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -55,7 +55,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, camelContext)); Expression correlation = createExpression(definition.getExpression()); - AggregationStrategy strategy = createAggregationStrategy(routeContext); + AggregationStrategy strategy = createAggregationStrategy(); boolean parallel = parseBoolean(definition.getParallelProcessing(), false); boolean shutdownThreadPool = willCreateNewThreadPool(definition, parallel); @@ -76,7 +76,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { } if (definition.getAggregateController() == null && definition.getAggregateControllerRef() != null) { - definition.setAggregateController(routeContext.mandatoryLookup(definition.getAggregateControllerRef(), AggregateController.class)); + definition.setAggregateController(mandatoryLookup(definition.getAggregateControllerRef(), AggregateController.class)); } // this EIP supports using a shared timeout checker thread pool or @@ -85,7 +85,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { ScheduledExecutorService timeoutThreadPool = definition.getTimeoutCheckerExecutorService(); if (timeoutThreadPool == null && definition.getTimeoutCheckerExecutorServiceRef() != null) { // lookup existing thread pool - timeoutThreadPool = routeContext.lookup(definition.getTimeoutCheckerExecutorServiceRef(), ScheduledExecutorService.class); + timeoutThreadPool = lookup(definition.getTimeoutCheckerExecutorServiceRef(), ScheduledExecutorService.class); if (timeoutThreadPool == null) { // then create a thread pool assuming the ref is a thread pool // profile id @@ -214,10 +214,10 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { return policy; } - private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { + private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(definition.getStrategyRef(), Object.class); + Object aggStrategy = lookup(definition.getStrategyRef(), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { @@ -246,7 +246,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { private AggregationRepository createAggregationRepository(RouteContext routeContext) { AggregationRepository repository = definition.getAggregationRepository(); if (repository == null && definition.getAggregationRepositoryRef() != null) { - repository = routeContext.mandatoryLookup(definition.getAggregationRepositoryRef(), AggregationRepository.class); + repository = mandatoryLookup(definition.getAggregationRepositoryRef(), AggregationRepository.class); } return repository; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ClaimCheckReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ClaimCheckReifier.java index bde4a0c..f725bc7 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ClaimCheckReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ClaimCheckReifier.java @@ -44,7 +44,7 @@ public class ClaimCheckReifier extends ProcessorReifier<ClaimCheckDefinition> { claim.setKey(parseString(definition.getKey())); claim.setFilter(parseString(definition.getFilter())); - AggregationStrategy strategy = createAggregationStrategy(routeContext); + AggregationStrategy strategy = createAggregationStrategy(); if (strategy != null) { claim.setAggregationStrategy(strategy); } @@ -102,10 +102,10 @@ public class ClaimCheckReifier extends ProcessorReifier<ClaimCheckDefinition> { return claim; } - private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { + private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getAggregationStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(parseString(definition.getAggregationStrategyRef()), Object.class); + Object aggStrategy = lookup(parseString(definition.getAggregationStrategyRef()), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java index 109ffc1..7dd0d53 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java @@ -58,7 +58,7 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> { private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getAggregationStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(definition.getAggregationStrategyRef(), Object.class); + Object aggStrategy = lookup(definition.getAggregationStrategyRef(), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/IdempotentConsumerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/IdempotentConsumerReifier.java index 693f092..ea01951 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/IdempotentConsumerReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/IdempotentConsumerReifier.java @@ -60,7 +60,7 @@ public class IdempotentConsumerReifier extends ExpressionReifier<IdempotentConsu */ protected <T> IdempotentRepository resolveMessageIdRepository(RouteContext routeContext) { if (definition.getMessageIdRepositoryRef() != null) { - definition.setMessageIdRepository(routeContext.mandatoryLookup(parseString(definition.getMessageIdRepositoryRef()), IdempotentRepository.class)); + definition.setMessageIdRepository(mandatoryLookup(parseString(definition.getMessageIdRepositoryRef()), IdempotentRepository.class)); } return definition.getMessageIdRepository(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LogReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LogReifier.java index 71a6d5c..7aa69a8 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LogReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LogReifier.java @@ -29,7 +29,6 @@ import org.apache.camel.processor.LogProcessor; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.MaskingFormatter; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.processor.DefaultMaskingFormatter; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; @@ -54,13 +53,13 @@ public class LogReifier extends ProcessorReifier<LogDefinition> { // get logger which may be set in XML definition if (logger == null && ObjectHelper.isNotEmpty(definition.getLoggerRef())) { - logger = CamelContextHelper.mandatoryLookup(camelContext, definition.getLoggerRef(), Logger.class); + logger = mandatoryLookup(definition.getLoggerRef(), Logger.class); } if (logger == null) { // first - try to lookup single instance in the registry, just like // LogComponent - Map<String, Logger> availableLoggers = routeContext.lookupByType(Logger.class); + Map<String, Logger> availableLoggers = findByTypeWithName(Logger.class); if (availableLoggers.size() == 1) { logger = availableLoggers.values().iterator().next(); log.debug("Using custom Logger: {}", logger); @@ -89,12 +88,12 @@ public class LogReifier extends ProcessorReifier<LogDefinition> { LoggingLevel level = definition.getLoggingLevel() != null ? parse(LoggingLevel.class, definition.getLoggingLevel()) : LoggingLevel.INFO; CamelLogger camelLogger = new CamelLogger(logger, level, definition.getMarker()); - return new LogProcessor(exp, camelLogger, getMaskingFormatter(routeContext), camelContext.adapt(ExtendedCamelContext.class).getLogListeners()); + return new LogProcessor(exp, camelLogger, getMaskingFormatter(), camelContext.adapt(ExtendedCamelContext.class).getLogListeners()); } - private MaskingFormatter getMaskingFormatter(RouteContext routeContext) { + private MaskingFormatter getMaskingFormatter() { if (routeContext.isLogMask()) { - MaskingFormatter formatter = routeContext.lookup(MaskingFormatter.CUSTOM_LOG_MASK_REF, MaskingFormatter.class); + MaskingFormatter formatter = lookup(MaskingFormatter.CUSTOM_LOG_MASK_REF, MaskingFormatter.class); if (formatter == null) { formatter = new DefaultMaskingFormatter(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java index 1c57ed3..c4a689d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/MulticastReifier.java @@ -30,7 +30,6 @@ import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { @@ -71,7 +70,7 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); } if (definition.getOnPrepareRef() != null) { - definition.setOnPrepare(CamelContextHelper.mandatoryLookup(camelContext, definition.getOnPrepareRef(), Processor.class)); + definition.setOnPrepare(mandatoryLookup(definition.getOnPrepareRef(), Processor.class)); } MulticastProcessor answer = new MulticastProcessor(camelContext, list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, @@ -83,7 +82,7 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(parseString(definition.getStrategyRef()), Object.class); + Object aggStrategy = lookup(parseString(definition.getStrategyRef()), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PolicyReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PolicyReifier.java index a96db82..de5b2ec 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PolicyReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PolicyReifier.java @@ -25,7 +25,7 @@ import org.apache.camel.spi.Policy; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; -public class PolicyReifier extends ProcessorReifier<PolicyDefinition> { +public class PolicyReifier extends AbstractPolicyReifier<PolicyDefinition> { public PolicyReifier(RouteContext routeContext, ProcessorDefinition<?> definition) { super(routeContext, (PolicyDefinition) definition); @@ -33,7 +33,7 @@ public class PolicyReifier extends ProcessorReifier<PolicyDefinition> { @Override public Processor createProcessor() throws Exception { - Policy policy = resolvePolicy(routeContext); + Policy policy = resolvePolicy(); ObjectHelper.notNull(policy, "policy", definition); // before wrap @@ -53,12 +53,8 @@ public class PolicyReifier extends ProcessorReifier<PolicyDefinition> { return target; } - protected Policy resolvePolicy(RouteContext routeContext) { - if (definition.getPolicy() != null) { - return definition.getPolicy(); - } - // reuse code on transacted definition to do the resolution - return TransactedReifier.resolvePolicy(routeContext, definition.getRef(), definition.getType()); + protected Policy resolvePolicy() { + return resolvePolicy(definition.getPolicy(), definition.getRef(), definition.getType()); } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java index ed1123b..b885cfa 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java @@ -62,7 +62,7 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> { private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getAggregationStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(parseString(definition.getAggregationStrategyRef()), Object.class); + Object aggStrategy = lookup(parseString(definition.getAggregationStrategyRef()), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessReifier.java index 2841268..9ad3c5c 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessReifier.java @@ -37,7 +37,7 @@ public class ProcessReifier extends ProcessorReifier<ProcessDefinition> { Processor answer = definition.getProcessor(); if (answer == null) { ObjectHelper.notNull(definition.getRef(), "ref", definition); - answer = routeContext.mandatoryLookup(definition.getRef(), Processor.class); + answer = mandatoryLookup(definition.getRef(), Processor.class); } // ensure its wrapped in a Service so we can manage it from eg. JMX diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index ac898c7..68e6caf 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -111,7 +111,6 @@ import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.ReifierStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteIdAware; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +242,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends // no there is a custom thread pool configured return false; } else if (definition.getExecutorServiceRef() != null) { - ExecutorService answer = routeContext.lookup(definition.getExecutorServiceRef(), ExecutorService.class); + ExecutorService answer = lookup(definition.getExecutorServiceRef(), ExecutorService.class); // if no existing thread pool, then we will have to create a new // thread pool return answer == null; @@ -384,12 +383,12 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends */ public ScheduledExecutorService lookupScheduledExecutorServiceRef(String name, Object source, String executorServiceRef) { - ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); - ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ObjectHelper.notNull(manager, "ExecutorServiceManager", camelContext); ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); // lookup in registry first and use existing thread pool if exists - ScheduledExecutorService answer = routeContext.lookup(executorServiceRef, ScheduledExecutorService.class); + ScheduledExecutorService answer = lookup(executorServiceRef, ScheduledExecutorService.class); if (answer == null) { // then create a thread pool assuming the ref is a thread pool // profile id @@ -425,7 +424,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); // lookup in registry first and use existing thread pool if exists - ExecutorService answer = routeContext.lookup(executorServiceRef, ExecutorService.class); + ExecutorService answer = lookup(executorServiceRef, ExecutorService.class); if (answer == null) { // then create a thread pool assuming the ref is a thread pool // profile id diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java index e1602f3..81aac98 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java @@ -27,13 +27,11 @@ import org.apache.camel.Processor; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RecipientListDefinition; import org.apache.camel.processor.EvaluateExpressionProcessor; -import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; public class RecipientListReifier extends ProcessorReifier<RecipientListDefinition<?>> { @@ -59,7 +57,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti } else { answer = new RecipientList(camelContext, expression); } - answer.setAggregationStrategy(createAggregationStrategy(routeContext)); + answer.setAggregationStrategy(createAggregationStrategy()); answer.setParallelProcessing(isParallelProcessing); answer.setParallelAggregate(isParallelAggregate); answer.setStreaming(isStreaming); @@ -71,7 +69,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti answer.setCacheSize(parseInt(definition.getCacheSize())); } if (definition.getOnPrepareRef() != null) { - definition.setOnPrepare(CamelContextHelper.mandatoryLookup(camelContext, definition.getOnPrepareRef(), Processor.class)); + definition.setOnPrepare(mandatoryLookup(definition.getOnPrepareRef(), Processor.class)); } if (definition.getOnPrepare() != null) { answer.setOnPrepare(definition.getOnPrepare()); @@ -110,10 +108,10 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti return answer.newPipeline(camelContext, pipe); } - private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { + private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(parseString(definition.getStrategyRef()), Object.class); + Object aggStrategy = lookup(parseString(definition.getStrategyRef()), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java index 882a581..2d8e4e5 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java @@ -29,7 +29,6 @@ import org.apache.camel.processor.StreamResequencer; import org.apache.camel.processor.resequencer.DefaultExchangeComparator; import org.apache.camel.processor.resequencer.ExpressionResultComparator; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.ObjectHelper; public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> { @@ -117,7 +116,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> { ExpressionResultComparator comparator; if (config.getComparatorRef() != null) { - comparator = CamelContextHelper.mandatoryLookup(camelContext, config.getComparatorRef(), ExpressionResultComparator.class); + comparator = mandatoryLookup(config.getComparatorRef(), ExpressionResultComparator.class); } else { comparator = config.getComparator(); if (comparator == null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java index 283fce8..7b5b098 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java @@ -109,7 +109,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { public Route createRoute() { try { - return doCreateRoute(camelContext, routeContext); + return doCreateRoute(); } catch (FailedToCreateRouteException e) { throw e; } catch (Exception e) { @@ -119,11 +119,6 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { } } - public Endpoint resolveEndpoint(String uri) throws NoSuchEndpointException { - ObjectHelper.notNull(camelContext, "CamelContext"); - return CamelContextHelper.getMandatoryEndpoint(camelContext, uri); - } - /** * Advices this route with the route builder. * <p/> @@ -243,13 +238,13 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { // Implementation methods // ------------------------------------------------------------------------- - protected Route doCreateRoute(CamelContext camelContext, RouteContext routeContext) throws Exception { + protected Route doCreateRoute() throws Exception { // configure error handler routeContext.setErrorHandlerFactory(definition.getErrorHandlerFactory()); // configure tracing if (definition.getTrace() != null) { - Boolean isTrace = CamelContextHelper.parseBoolean(camelContext, definition.getTrace()); + Boolean isTrace = parseBoolean(definition.getTrace()); if (isTrace != null) { routeContext.setTracing(isTrace); if (isTrace) { @@ -262,7 +257,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { // configure message history if (definition.getMessageHistory() != null) { - Boolean isMessageHistory = CamelContextHelper.parseBoolean(camelContext, definition.getMessageHistory()); + Boolean isMessageHistory = parseBoolean(definition.getMessageHistory()); if (isMessageHistory != null) { routeContext.setMessageHistory(isMessageHistory); if (isMessageHistory) { @@ -273,7 +268,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { // configure Log EIP mask if (definition.getLogMask() != null) { - Boolean isLogMask = CamelContextHelper.parseBoolean(camelContext, definition.getLogMask()); + Boolean isLogMask = parseBoolean(definition.getLogMask()); if (isLogMask != null) { routeContext.setLogMask(isLogMask); if (isLogMask) { @@ -284,7 +279,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { // configure stream caching if (definition.getStreamCache() != null) { - Boolean isStreamCache = CamelContextHelper.parseBoolean(camelContext, definition.getStreamCache()); + Boolean isStreamCache = parseBoolean(definition.getStreamCache()); if (isStreamCache != null) { routeContext.setStreamCaching(isStreamCache); if (isStreamCache) { @@ -295,7 +290,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { // configure delayer if (definition.getDelayer() != null) { - Long delayer = CamelContextHelper.parseLong(camelContext, definition.getDelayer()); + Long delayer = parseLong(definition.getDelayer()); if (delayer != null) { routeContext.setDelayer(delayer); if (delayer > 0) { @@ -317,7 +312,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { StringTokenizer policyTokens = new StringTokenizer(definition.getRoutePolicyRef(), ","); while (policyTokens.hasMoreTokens()) { String ref = policyTokens.nextToken().trim(); - RoutePolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RoutePolicy.class); + RoutePolicy policy = mandatoryLookup(ref, RoutePolicy.class); log.debug("RoutePolicy is enabled: {} on route: {}", policy, definition.getId()); routeContext.getRoutePolicyList().add(policy); } @@ -333,7 +328,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { } // configure auto startup - Boolean isAutoStartup = CamelContextHelper.parseBoolean(camelContext, definition.getAutoStartup()); + Boolean isAutoStartup = parseBoolean(definition.getAutoStartup()); if (isAutoStartup != null) { log.debug("Using AutoStartup {} on route: {}", isAutoStartup, definition.getId()); routeContext.setAutoStartup(isAutoStartup); @@ -364,7 +359,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { if (def != null) { endpoint = def.resolve(camelContext); } else { - endpoint = routeContext.resolveEndpoint(definition.getInput().getEndpointUri()); + endpoint = resolveEndpoint(definition.getInput().getEndpointUri()); } } routeContext.setEndpoint(endpoint); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SagaReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SagaReifier.java index 1baa76b..a6ce8b1 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SagaReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SagaReifier.java @@ -45,9 +45,13 @@ public class SagaReifier extends ProcessorReifier<SagaDefinition> { @Override public Processor createProcessor() throws Exception { - Optional<Endpoint> compensationEndpoint = Optional.ofNullable(definition.getCompensation()).map(SagaActionUriDefinition::getUri).map(routeContext::resolveEndpoint); + Optional<Endpoint> compensationEndpoint = Optional.ofNullable(definition.getCompensation()) + .map(SagaActionUriDefinition::getUri) + .map(this::resolveEndpoint); - Optional<Endpoint> completionEndpoint = Optional.ofNullable(definition.getCompletion()).map(SagaActionUriDefinition::getUri).map(routeContext::resolveEndpoint); + Optional<Endpoint> completionEndpoint = Optional.ofNullable(definition.getCompletion()) + .map(SagaActionUriDefinition::getUri) + .map(this::resolveEndpoint); Map<String, Expression> optionsMap = new TreeMap<>(); if (definition.getOptions() != null) { @@ -90,7 +94,7 @@ public class SagaReifier extends ProcessorReifier<SagaDefinition> { } if (definition.getSagaServiceRef() != null) { - return CamelContextHelper.mandatoryLookup(context, parseString(definition.getSagaServiceRef()), CamelSagaService.class); + return mandatoryLookup(parseString(definition.getSagaServiceRef()), CamelSagaService.class); } sagaService = context.hasService(CamelSagaService.class); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SendReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SendReifier.java index 583cefa..09a9ea5 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SendReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SendReifier.java @@ -23,6 +23,7 @@ import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.SendDefinition; import org.apache.camel.processor.SendProcessor; import org.apache.camel.spi.RouteContext; +import org.apache.camel.support.CamelContextHelper; public class SendReifier extends ProcessorReifier<SendDefinition<?>> { @@ -32,16 +33,16 @@ public class SendReifier extends ProcessorReifier<SendDefinition<?>> { @Override public Processor createProcessor() throws Exception { - Endpoint endpoint = resolveEndpoint(routeContext); + Endpoint endpoint = resolveEndpoint(); return new SendProcessor(endpoint, parse(ExchangePattern.class, definition.getPattern())); } - public Endpoint resolveEndpoint(RouteContext context) { + public Endpoint resolveEndpoint() { if (definition.getEndpoint() == null) { if (definition.getEndpointProducerBuilder() == null) { - return context.resolveEndpoint(definition.getEndpointUri(), (String)null); + return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), (String)null); } else { - return definition.getEndpointProducerBuilder().resolve(context.getCamelContext()); + return definition.getEndpointProducerBuilder().resolve(camelContext); } } else { return definition.getEndpoint(); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SortReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SortReifier.java index ae5b053..dc3bc07 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SortReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SortReifier.java @@ -40,7 +40,7 @@ public class SortReifier<T, U extends SortDefinition<T>> extends ExpressionReifi public Processor createProcessor() throws Exception { // lookup in registry if (isNotEmpty(definition.getComparatorRef())) { - definition.setComparator(routeContext.lookup(parseString(definition.getComparatorRef()), Comparator.class)); + definition.setComparator(lookup(parseString(definition.getComparatorRef()), Comparator.class)); } // if no comparator then default on to string representation diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java index 54e1997..4c4e592 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/SplitReifier.java @@ -28,7 +28,6 @@ import org.apache.camel.processor.Splitter; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; public class SplitReifier extends ExpressionReifier<SplitDefinition> { @@ -55,7 +54,7 @@ public class SplitReifier extends ExpressionReifier<SplitDefinition> { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); } if (definition.getOnPrepareRef() != null) { - definition.setOnPrepare(CamelContextHelper.mandatoryLookup(camelContext, parseString(definition.getOnPrepareRef()), Processor.class)); + definition.setOnPrepare(mandatoryLookup(parseString(definition.getOnPrepareRef()), Processor.class)); } Expression exp = createExpression(definition.getExpression()); @@ -69,7 +68,7 @@ public class SplitReifier extends ExpressionReifier<SplitDefinition> { private AggregationStrategy createAggregationStrategy() { AggregationStrategy strategy = definition.getAggregationStrategy(); if (strategy == null && definition.getStrategyRef() != null) { - Object aggStrategy = routeContext.lookup(definition.getStrategyRef(), Object.class); + Object aggStrategy = lookup(definition.getStrategyRef(), Object.class); if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy)aggStrategy; } else if (aggStrategy != null) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrowExceptionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrowExceptionReifier.java index 928ca52..efd6c35 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrowExceptionReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ThrowExceptionReifier.java @@ -17,7 +17,6 @@ package org.apache.camel.reifier; import org.apache.camel.Processor; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.ThrowExceptionDefinition; import org.apache.camel.processor.ThrowExceptionProcessor; @@ -33,7 +32,7 @@ public class ThrowExceptionReifier extends ProcessorReifier<ThrowExceptionDefini public Processor createProcessor() { Exception exception = definition.getException(); if (exception == null && definition.getRef() != null) { - exception = routeContext.lookup(parseString(definition.getRef()), Exception.class); + exception = lookup(parseString(definition.getRef()), Exception.class); } Class<? extends Exception> exceptionClass = definition.getExceptionClass(); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java index 68b129c..3750a8e 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TransactedReifier.java @@ -16,36 +16,28 @@ */ package org.apache.camel.reifier; -import java.lang.reflect.Method; -import java.util.Map; - -import org.apache.camel.NoSuchBeanException; +import org.apache.camel.CamelContext; import org.apache.camel.Processor; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.Service; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.TransactedDefinition; import org.apache.camel.processor.WrapProcessor; import org.apache.camel.spi.Policy; import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.TransactedPolicy; -import org.apache.camel.support.CamelContextHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.camel.model.TransactedDefinition.PROPAGATION_REQUIRED; - -public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { - private static final Logger LOG = LoggerFactory.getLogger(TransactedReifier.class); +public class TransactedReifier extends AbstractPolicyReifier<TransactedDefinition> { public TransactedReifier(RouteContext routeContext, ProcessorDefinition<?> definition) { super(routeContext, (TransactedDefinition) definition); } + public TransactedReifier(CamelContext camelContext, ProcessorDefinition<?> definition) { + super(camelContext, (TransactedDefinition) definition); + } + @Override public Processor createProcessor() throws Exception { - Policy policy = resolvePolicy(routeContext); + Policy policy = resolvePolicy(); org.apache.camel.util.ObjectHelper.notNull(policy, "policy", this); // before wrap @@ -65,98 +57,8 @@ public class TransactedReifier extends ProcessorReifier<TransactedDefinition> { return target; } - protected Policy resolvePolicy(RouteContext routeContext) { - return resolvePolicy(routeContext, definition); + public Policy resolvePolicy() { + return resolvePolicy(definition.getPolicy(), definition.getRef(), definition.getType()); } - public static Policy resolvePolicy(RouteContext routeContext, TransactedDefinition definition) { - if (definition.getPolicy() != null) { - return definition.getPolicy(); - } - return resolvePolicy(routeContext, definition.getRef(), definition.getType()); - } - - public static Policy resolvePolicy(RouteContext routeContext, String ref, Class<? extends Policy> type) { - // explicit ref given so lookup by it - if (org.apache.camel.util.ObjectHelper.isNotEmpty(ref)) { - return CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, Policy.class); - } - - // no explicit reference given from user so we can use some convention - // over configuration here - - // try to lookup by scoped type - Policy answer = null; - if (type != null) { - // try find by type, note that this method is not supported by all - // registry - Map<String, ?> types = routeContext.lookupByType(type); - if (types.size() == 1) { - // only one policy defined so use it - Object found = types.values().iterator().next(); - if (type.isInstance(found)) { - return type.cast(found); - } - } - } - - // for transacted routing try the default REQUIRED name - if (type == TransactedPolicy.class) { - // still not found try with the default name PROPAGATION_REQUIRED - answer = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); - } - - // this logic only applies if we are a transacted policy - // still no policy found then try lookup the platform transaction - // manager and use it as policy - if (answer == null && type == TransactedPolicy.class) { - Class<?> tmClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.springframework.transaction.PlatformTransactionManager"); - if (tmClazz != null) { - // see if we can find the platform transaction manager in the - // registry - Map<String, ?> maps = routeContext.lookupByType(tmClazz); - if (maps.size() == 1) { - // only one platform manager then use it as default and - // create a transacted - // policy with it and default to required - - // as we do not want dependency on spring jars in the - // camel-core we use - // reflection to lookup classes and create new objects and - // call methods - // as this is only done during route building it does not - // matter that we - // use reflection as performance is no a concern during - // route building - Object transactionManager = maps.values().iterator().next(); - LOG.debug("One instance of PlatformTransactionManager found in registry: {}", transactionManager); - Class<?> txClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.apache.camel.spring.spi.SpringTransactionPolicy"); - if (txClazz != null) { - LOG.debug("Creating a new temporary SpringTransactionPolicy using the PlatformTransactionManager: {}", transactionManager); - TransactedPolicy txPolicy = org.apache.camel.support.ObjectHelper.newInstance(txClazz, TransactedPolicy.class); - Method method; - try { - method = txClazz.getMethod("setTransactionManager", tmClazz); - } catch (NoSuchMethodException e) { - throw new RuntimeCamelException("Cannot get method setTransactionManager(PlatformTransactionManager) on class: " + txClazz); - } - org.apache.camel.support.ObjectHelper.invokeMethod(method, txPolicy, transactionManager); - return txPolicy; - } else { - // camel-spring is missing on the classpath - throw new RuntimeCamelException("Cannot create a transacted policy as camel-spring.jar is not on the classpath!"); - } - } else { - if (maps.isEmpty()) { - throw new NoSuchBeanException(null, "PlatformTransactionManager"); - } else { - throw new IllegalArgumentException("Found " + maps.size() + " PlatformTransactionManager in registry. " - + "Cannot determine which one to use. Please configure a TransactionTemplate on the transacted policy."); - } - } - } - } - - return answer; - } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 2c40e1d..8edaae7 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -29,7 +29,6 @@ import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.WireTapProcessor; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { @@ -66,7 +65,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { answer.setCopy(isCopy); Processor newExchangeProcessor = definition.getNewExchangeProcessor(); if (definition.getNewExchangeProcessorRef() != null) { - newExchangeProcessor = routeContext.mandatoryLookup(parseString(definition.getNewExchangeProcessorRef()), Processor.class); + newExchangeProcessor = mandatoryLookup(parseString(definition.getNewExchangeProcessorRef()), Processor.class); } if (newExchangeProcessor != null) { answer.addNewExchangeProcessor(newExchangeProcessor); @@ -82,7 +81,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { } Processor onPrepare = definition.getOnPrepare(); if (definition.getOnPrepareRef() != null) { - onPrepare = CamelContextHelper.mandatoryLookup(camelContext, parseString(definition.getOnPrepareRef()), Processor.class); + onPrepare = mandatoryLookup(parseString(definition.getOnPrepareRef()), Processor.class); } if (onPrepare != null) { answer.setOnPrepare(onPrepare); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/dataformat/DataFormatReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/dataformat/DataFormatReifier.java index 792c85c..a3c2712 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/dataformat/DataFormatReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/dataformat/DataFormatReifier.java @@ -75,6 +75,7 @@ import org.apache.camel.spi.DataFormatContentTypeHeader; import org.apache.camel.spi.PropertyConfigurer; import org.apache.camel.spi.PropertyConfigurerAware; import org.apache.camel.spi.ReifierStrategy; +import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -163,7 +164,7 @@ public abstract class DataFormatReifier<T extends DataFormatDefinition> extends if (type == null) { ObjectHelper.notNull(ref, "ref or type"); - DataFormat dataFormat = camelContext.getRegistry().lookupByNameAndType(ref, DataFormat.class); + DataFormat dataFormat = CamelContextHelper.lookup(camelContext, ref, DataFormat.class); if (dataFormat != null) { return dataFormat; } @@ -264,7 +265,7 @@ public abstract class DataFormatReifier<T extends DataFormatDefinition> extends } if (configurer == null) { final String configurerName = name + "-dataformat-configurer"; - configurer = camelContext.getRegistry().lookupByNameAndType(configurerName, PropertyConfigurer.class); + configurer = lookup(configurerName, PropertyConfigurer.class); if (LOG.isDebugEnabled() && configurer != null) { LOG.debug("Discovered dataformat property configurer using the Camel registry: {} -> {}", configurerName, configurer); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java index 30262b5..7235574 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java @@ -51,7 +51,7 @@ public class DefaultErrorHandlerReifier<T extends DefaultErrorHandlerBuilder> ex // camel context will shutdown the executor when it shutdown so no // need to shut it down when stopping if (executorServiceRef != null) { - executorService = camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); + executorService = lookup(executorServiceRef, ScheduledExecutorService.class); if (executorService == null) { ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java index c190132..0d6f3d7 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java @@ -184,6 +184,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> */ public static ErrorHandlerFactory lookupErrorHandlerFactory(RouteContext routeContext, String ref, boolean mandatory) { ErrorHandlerFactory answer; + CamelContext camelContext = routeContext.getCamelContext(); // if the ref is the default then we do not have any explicit error // handler configured @@ -204,7 +205,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> // the other has also no explicit error handler configured // then fallback to the handler // configured on the parent camel context - answer = lookupErrorHandlerFactory(routeContext.getCamelContext()); + answer = lookupErrorHandlerFactory(camelContext); } if (answer == null) { // the other has also no explicit error handler configured @@ -222,9 +223,9 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> } else { // use specific configured error handler if (mandatory) { - answer = routeContext.mandatoryLookup(ref, ErrorHandlerBuilder.class); + answer = CamelContextHelper.mandatoryLookup(camelContext, ref, ErrorHandlerBuilder.class); } else { - answer = routeContext.lookup(ref, ErrorHandlerBuilder.class); + answer = CamelContextHelper.lookup(camelContext, ref, ErrorHandlerBuilder.class); } } @@ -237,7 +238,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> ErrorHandlerBuilderRef other = (ErrorHandlerBuilderRef)answer; String otherRef = other.getRef(); if (isErrorHandlerFactoryConfigured(otherRef)) { - answer = camelContext.getRegistry().lookupByNameAndType(otherRef, ErrorHandlerBuilder.class); + answer = CamelContextHelper.lookup(camelContext, otherRef, ErrorHandlerBuilder.class); if (answer == null) { throw new IllegalArgumentException("ErrorHandlerBuilder with id " + otherRef + " not found in registry."); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/CustomLoadBalancerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/CustomLoadBalancerReifier.java index cba3d83..fa13084 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/CustomLoadBalancerReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/loadbalancer/CustomLoadBalancerReifier.java @@ -20,7 +20,6 @@ import org.apache.camel.model.LoadBalancerDefinition; import org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition; import org.apache.camel.processor.loadbalancer.LoadBalancer; import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.StringHelper; public class CustomLoadBalancerReifier extends LoadBalancerReifier<CustomLoadBalancerDefinition> { @@ -35,7 +34,7 @@ public class CustomLoadBalancerReifier extends LoadBalancerReifier<CustomLoadBal return definition.getCustomLoadBalancer(); } StringHelper.notEmpty(definition.getRef(), "ref", this); - return CamelContextHelper.mandatoryLookup(camelContext, definition.getRef(), LoadBalancer.class); + return mandatoryLookup(definition.getRef(), LoadBalancer.class); } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java index 850226a..7c00ba8 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/rest/RestBindingReifier.java @@ -79,7 +79,7 @@ public class RestBindingReifier extends AbstractReifier { String name = config.getJsonDataFormat(); if (name != null) { // must only be a name, not refer to an existing instance - Object instance = camelContext.getRegistry().lookupByName(name); + Object instance = lookupByName(name); if (instance != null) { throw new IllegalArgumentException("JsonDataFormat name: " + name + " must not be an existing bean instance from the registry"); } @@ -103,7 +103,7 @@ public class RestBindingReifier extends AbstractReifier { String name = config.getXmlDataFormat(); if (name != null) { // must only be a name, not refer to an existing instance - Object instance = camelContext.getRegistry().lookupByName(name); + Object instance = lookupByName(name); if (instance != null) { throw new IllegalArgumentException("XmlDataFormat name: " + name + " must not be an existing bean instance from the registry"); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/CustomTransformeReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/CustomTransformeReifier.java index 9069623..87f2f78 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/CustomTransformeReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/CustomTransformeReifier.java @@ -34,7 +34,7 @@ public class CustomTransformeReifier extends TransformerReifier<CustomTransforme } Transformer transformer; if (definition.getRef() != null) { - transformer = camelContext.getRegistry().lookupByNameAndType(parseString(definition.getRef()), Transformer.class); + transformer = lookup(parseString(definition.getRef()), Transformer.class); if (transformer == null) { throw new IllegalArgumentException("Cannot find transformer with ref:" + definition.getRef()); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/EndpointTransformeReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/EndpointTransformeReifier.java index 6304a2c..27ff407 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/EndpointTransformeReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/transformer/EndpointTransformeReifier.java @@ -34,7 +34,7 @@ public class EndpointTransformeReifier extends TransformerReifier<EndpointTransf @Override protected Transformer doCreateTransformer() { Endpoint endpoint = definition.getUri() != null ? camelContext.getEndpoint(definition.getUri()) - : camelContext.getRegistry().lookupByNameAndType(parseString(definition.getRef()), Endpoint.class); + : lookup(parseString(definition.getRef()), Endpoint.class); SendProcessor processor = new SendProcessor(endpoint, ExchangePattern.InOut); return new ProcessorTransformer(camelContext).setProcessor(processor) .setModel(parseString(definition.getScheme())) diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/CustomValidatorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/CustomValidatorReifier.java index bca3380..4510212 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/CustomValidatorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/CustomValidatorReifier.java @@ -34,7 +34,7 @@ public class CustomValidatorReifier extends ValidatorReifier<CustomValidatorDefi } Validator validator; if (definition.getRef() != null) { - validator = camelContext.getRegistry().lookupByNameAndType(definition.getRef(), Validator.class); + validator = lookup(definition.getRef(), Validator.class); if (validator == null) { throw new IllegalArgumentException("Cannot find validator with ref:" + definition.getRef()); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/EndpointValidatorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/EndpointValidatorReifier.java index 7652bce..c3e8bf6 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/EndpointValidatorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/validator/EndpointValidatorReifier.java @@ -33,7 +33,7 @@ public class EndpointValidatorReifier extends ValidatorReifier<EndpointValidator @Override protected Validator doCreateValidator() { - Endpoint endpoint = definition.getUri() != null ? camelContext.getEndpoint(definition.getUri()) : camelContext.getRegistry().lookupByNameAndType(definition.getRef(), Endpoint.class); + Endpoint endpoint = definition.getUri() != null ? camelContext.getEndpoint(definition.getUri()) : lookup(definition.getRef(), Endpoint.class); SendProcessor processor = new SendProcessor(endpoint, ExchangePattern.InOut); return new ProcessorValidator(camelContext).setProcessor(processor).setType(definition.getType()); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java index 08f1b46..20589ee 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java @@ -16,11 +16,9 @@ */ package org.apache.camel.support; -import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.function.Function; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -31,6 +29,7 @@ import org.apache.camel.NoSuchBeanException; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.spi.NormalizedEndpointUri; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.util.ObjectHelper; @@ -117,6 +116,37 @@ public final class CamelContextHelper { return ObjectHelper.cast(type, endpoint); } + public static Endpoint resolveEndpoint(CamelContext camelContext, String uri, String ref) { + Endpoint endpoint = null; + if (uri != null) { + endpoint = camelContext.getEndpoint(uri); + if (endpoint == null) { + throw new NoSuchEndpointException(uri); + } + } + if (ref != null) { + endpoint = camelContext.getRegistry().lookupByNameAndType(ref, Endpoint.class); + if (endpoint == null) { + throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); + } + // Check the endpoint has the right CamelContext + if (!camelContext.equals(endpoint.getCamelContext())) { + throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); + } + try { + // need add the endpoint into service + camelContext.addService(endpoint); + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + if (endpoint == null) { + throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified"); + } else { + return endpoint; + } + } + /** * Converts the given value to the requested type */
