This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 176153ca1329ee3fb376d1b1ca283d7186bb94f4 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jun 11 14:25:30 2019 +0200 CAMEL-13627: camel-main - Reuse options for main and camel-spring-boo --- .../camel/main/DefaultConfigurationConfigurer.java | 201 ++++++++++++++++++++- .../java/org/apache/camel/main/MainSupport.java | 194 +------------------- 2 files changed, 200 insertions(+), 195 deletions(-) diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 60073f8..c13f041 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -16,11 +16,48 @@ */ package org.apache.camel.main; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; + import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.TypeConverters; +import org.apache.camel.cloud.ServiceRegistry; +import org.apache.camel.cluster.CamelClusterService; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.health.HealthCheckRepository; +import org.apache.camel.health.HealthCheckService; import org.apache.camel.model.Model; +import org.apache.camel.processor.interceptor.BacklogTracer; +import org.apache.camel.processor.interceptor.HandleFault; +import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.spi.EndpointStrategy; +import org.apache.camel.spi.EventFactory; +import org.apache.camel.spi.EventNotifier; +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.InflightRepository; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.LifecycleStrategy; +import org.apache.camel.spi.LogListener; +import org.apache.camel.spi.ManagementObjectNameStrategy; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.Registry; +import org.apache.camel.spi.RouteController; +import org.apache.camel.spi.RoutePolicyFactory; +import org.apache.camel.spi.RuntimeEndpointRegistry; +import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.UnitOfWorkFactory; +import org.apache.camel.spi.UuidGenerator; +import org.apache.camel.support.jsse.GlobalSSLContextParametersSupplier; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * To configure the {@link DefaultConfigurationProperties} on {@link org.apache.camel.CamelContext} @@ -28,6 +65,8 @@ import org.apache.camel.spi.StreamCachingStrategy; */ public final class DefaultConfigurationConfigurer { + public static final Logger LOG = LoggerFactory.getLogger(DefaultConfigurationConfigurer.class); + private DefaultConfigurationConfigurer() { } @@ -37,7 +76,7 @@ public final class DefaultConfigurationConfigurer { * @param camelContext the camel context * @param config the configuration */ - public static void configure(CamelContext camelContext, DefaultConfigurationProperties config) { + public static void configure(CamelContext camelContext, DefaultConfigurationProperties config) throws Exception { if (!config.isJmxEnabled()) { camelContext.disableJMX(); } @@ -112,7 +151,165 @@ public final class DefaultConfigurationConfigurer { if (config.getRouteFilterIncludePattern() != null || config.getRouteFilterExcludePattern() != null) { camelContext.getExtension(Model.class).setRouteFilterPattern(config.getRouteFilterIncludePattern(), config.getRouteFilterExcludePattern()); } + } + + /** + * Performs additional configuration to lookup beans of Camel types to configure + * additional configurations on the Camel context. + * <p/> + * Similar code in camel-core-xml module in class org.apache.camel.core.xml.AbstractCamelContextFactoryBean + * or in camel-spring-boot module in class org.apache.camel.spring.boot.CamelAutoConfiguration. + */ + public static void afterPropertiesSet(CamelContext camelContext, Registry registry) throws Exception { + final ManagementStrategy managementStrategy = camelContext.getManagementStrategy(); + + registerPropertyForBeanType(registry, BacklogTracer.class, bt -> camelContext.setExtension(BacklogTracer.class, bt)); + registerPropertyForBeanType(registry, HandleFault.class, camelContext.adapt(ExtendedCamelContext.class)::addInterceptStrategy); + registerPropertyForBeanType(registry, InflightRepository.class, camelContext::setInflightRepository); + registerPropertyForBeanType(registry, AsyncProcessorAwaitManager.class, camelContext.adapt(ExtendedCamelContext.class)::setAsyncProcessorAwaitManager); + registerPropertyForBeanType(registry, ManagementStrategy.class, camelContext::setManagementStrategy); + registerPropertyForBeanType(registry, ManagementObjectNameStrategy.class, managementStrategy::setManagementObjectNameStrategy); + registerPropertyForBeanType(registry, EventFactory.class, managementStrategy::setEventFactory); + registerPropertyForBeanType(registry, UnitOfWorkFactory.class, camelContext.adapt(ExtendedCamelContext.class)::setUnitOfWorkFactory); + registerPropertyForBeanType(registry, RuntimeEndpointRegistry.class, camelContext::setRuntimeEndpointRegistry); + + registerPropertiesForBeanTypes(registry, TypeConverters.class, camelContext.getTypeConverterRegistry()::addTypeConverters); + + final Predicate<EventNotifier> containsEventNotifier = managementStrategy.getEventNotifiers()::contains; + registerPropertiesForBeanTypesWithCondition(registry, EventNotifier.class, containsEventNotifier.negate(), managementStrategy::addEventNotifier); + + registerPropertiesForBeanTypes(registry, EndpointStrategy.class, camelContext.adapt(ExtendedCamelContext.class)::registerEndpointCallback); + + registerPropertyForBeanType(registry, ShutdownStrategy.class, camelContext::setShutdownStrategy); + + final Predicate<InterceptStrategy> containsInterceptStrategy = camelContext.adapt(ExtendedCamelContext.class).getInterceptStrategies()::contains; + registerPropertiesForBeanTypesWithCondition(registry, InterceptStrategy.class, containsInterceptStrategy.negate(), camelContext.adapt(ExtendedCamelContext.class)::addInterceptStrategy); + + final Predicate<LifecycleStrategy> containsLifecycleStrategy = camelContext.getLifecycleStrategies()::contains; + registerPropertiesForBeanTypesWithCondition(registry, LifecycleStrategy.class, containsLifecycleStrategy.negate(), camelContext::addLifecycleStrategy); + + registerPropertiesForBeanTypes(registry, CamelClusterService.class, addServiceToContext(camelContext)); + + // service registry + Map<String, ServiceRegistry> serviceRegistries = registry.findByTypeWithName(ServiceRegistry.class); + if (serviceRegistries != null && !serviceRegistries.isEmpty()) { + for (Map.Entry<String, ServiceRegistry> entry : serviceRegistries.entrySet()) { + ServiceRegistry service = entry.getValue(); + + if (service.getId() == null) { + service.setId(camelContext.getUuidGenerator().generateUuid()); + } + + LOG.info("Using ServiceRegistry with id: {} and implementation: {}", service.getId(), service); + camelContext.addService(service); + } + } + + registerPropertiesForBeanTypes(registry, RoutePolicyFactory.class, camelContext::addRoutePolicyFactory); + + // add SSL context parameters + GlobalSSLContextParametersSupplier sslContextParametersSupplier = getSingleBeanOfType(registry, GlobalSSLContextParametersSupplier.class); + if (sslContextParametersSupplier != null) { + camelContext.setSSLContextParameters(sslContextParametersSupplier.get()); + } + // Health check registry + HealthCheckRegistry healthCheckRegistry = getSingleBeanOfType(registry, HealthCheckRegistry.class); + if (healthCheckRegistry != null) { + healthCheckRegistry.setCamelContext(camelContext); + LOG.info("Using HealthCheckRegistry: {}", healthCheckRegistry); + camelContext.setExtension(HealthCheckRegistry.class, healthCheckRegistry); + } else { + healthCheckRegistry = HealthCheckRegistry.get(camelContext); + healthCheckRegistry.setCamelContext(camelContext); + } + + registerPropertiesForBeanTypes(registry, HealthCheckRepository.class, healthCheckRegistry::addRepository); + + registerPropertyForBeanType(registry, HealthCheckService.class, addServiceToContext(camelContext)); + registerPropertyForBeanType(registry, RouteController.class, camelContext::setRouteController); + registerPropertyForBeanType(registry, UuidGenerator.class, camelContext::setUuidGenerator); + + final Predicate<LogListener> containsLogListener = camelContext.adapt(ExtendedCamelContext.class).getLogListeners()::contains; + registerPropertiesForBeanTypesWithCondition(registry, LogListener.class, containsLogListener.negate(), camelContext.adapt(ExtendedCamelContext.class)::addLogListener); + + registerPropertyForBeanType(registry, ExecutorServiceManager.class, camelContext::setExecutorServiceManager); + + // set the default thread pool profile if defined + initThreadPoolProfiles(registry, camelContext); + } + + private static <T> void registerPropertyForBeanType(final Registry registry, final Class<T> beanType, final Consumer<T> propertySetter) { + T propertyBean = getSingleBeanOfType(registry, beanType); + if (propertyBean == null) { + return; + } + + LOG.info("Using custom {}: {}", beanType.getSimpleName(), propertyBean); + propertySetter.accept(propertyBean); + } + + private static <T> T getSingleBeanOfType(Registry registry, Class<T> type) { + Map<String, T> beans = registry.findByTypeWithName(type); + if (beans.size() == 1) { + return beans.values().iterator().next(); + } else { + return null; + } + } + + private static <T> void registerPropertiesForBeanTypes(final Registry registry, final Class<T> beanType, final Consumer<T> propertySetter) { + registerPropertiesForBeanTypesWithCondition(registry, beanType, b -> true, propertySetter); + } + + private static <T> void registerPropertiesForBeanTypesWithCondition(final Registry registry, final Class<T> beanType, final Predicate<T> condition, + final Consumer<T> propertySetter) { + final Map<String, T> beans = registry.findByTypeWithName(beanType); + if (!ObjectHelper.isNotEmpty(beans)) { + return; + } + final String simpleName = beanType.getSimpleName(); + beans.forEach((name, bean) -> { + if (condition.test(bean)) { + LOG.info("Adding custom {} with id: {} and implementation: {}", simpleName, name, bean); + propertySetter.accept(bean); + } + }); } - + + private static <T> Consumer<T> addServiceToContext(final CamelContext camelContext) { + return service -> { + try { + camelContext.addService(service); + } catch (Exception e) { + throw new RuntimeException("Unable to add service to Camel context", e); + } + }; + } + + private static void initThreadPoolProfiles(Registry registry, CamelContext camelContext) { + Set<String> defaultIds = new HashSet<>(); + + // lookup and use custom profiles from the registry + Map<String, ThreadPoolProfile> profiles = registry.findByTypeWithName(ThreadPoolProfile.class); + if (profiles != null && !profiles.isEmpty()) { + for (Map.Entry<String, ThreadPoolProfile> entry : profiles.entrySet()) { + ThreadPoolProfile profile = entry.getValue(); + // do not add if already added, for instance a tracer that is also an InterceptStrategy class + if (profile.isDefaultProfile()) { + LOG.info("Using custom default ThreadPoolProfile with id: {} and implementation: {}", entry.getKey(), profile); + camelContext.getExecutorServiceManager().setDefaultThreadPoolProfile(profile); + defaultIds.add(entry.getKey()); + } else { + camelContext.getExecutorServiceManager().registerThreadPoolProfile(profile); + } + } + } + + // validate at most one is defined + if (defaultIds.size() > 1) { + throw new IllegalArgumentException("Only exactly one default ThreadPoolProfile is allowed, was " + defaultIds.size() + " ids: " + defaultIds); + } + } + } diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java b/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java index 2ab9933..dd68816 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java @@ -21,62 +21,31 @@ import java.io.FileInputStream; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.function.Predicate; import org.apache.camel.CamelContext; import org.apache.camel.Component; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ProducerTemplate; -import org.apache.camel.TypeConverters; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.cloud.ServiceRegistry; -import org.apache.camel.cluster.CamelClusterService; -import org.apache.camel.health.HealthCheckRegistry; -import org.apache.camel.health.HealthCheckRepository; -import org.apache.camel.health.HealthCheckService; import org.apache.camel.model.Model; import org.apache.camel.model.RouteDefinition; -import org.apache.camel.processor.interceptor.BacklogTracer; -import org.apache.camel.processor.interceptor.HandleFault; -import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelBeanPostProcessor; import org.apache.camel.spi.DataFormat; -import org.apache.camel.spi.EndpointStrategy; -import org.apache.camel.spi.EventFactory; import org.apache.camel.spi.EventNotifier; -import org.apache.camel.spi.ExecutorServiceManager; -import org.apache.camel.spi.InflightRepository; -import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.Language; -import org.apache.camel.spi.LifecycleStrategy; -import org.apache.camel.spi.LogListener; -import org.apache.camel.spi.ManagementObjectNameStrategy; -import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.PropertiesComponent; -import org.apache.camel.spi.Registry; -import org.apache.camel.spi.RouteController; -import org.apache.camel.spi.RoutePolicyFactory; -import org.apache.camel.spi.RuntimeEndpointRegistry; -import org.apache.camel.spi.ShutdownStrategy; -import org.apache.camel.spi.ThreadPoolProfile; -import org.apache.camel.spi.UnitOfWorkFactory; -import org.apache.camel.spi.UuidGenerator; import org.apache.camel.support.LifecycleStrategySupport; import org.apache.camel.support.PropertyBindingSupport; -import org.apache.camel.support.jsse.GlobalSSLContextParametersSupplier; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.FileUtil; @@ -821,9 +790,7 @@ public abstract class MainSupport extends ServiceSupport { // configure the common/default options DefaultConfigurationConfigurer.configure(camelContext, config); - - // additional advanced configuration which is not configured using DefaultConfigurationProperties - afterPropertiesSet(camelContext.getRegistry(), camelContext); + DefaultConfigurationConfigurer.afterPropertiesSet(camelContext, camelContext.getRegistry()); // now configure context with additional properties Properties prop = camelContext.getPropertiesComponent().loadProperties(); @@ -842,165 +809,6 @@ public abstract class MainSupport extends ServiceSupport { setCamelProperties(camelContext, camelContext, properties, true); } - /** - * Performs additional configuration to lookup beans of Camel types to configure - * advanced configurations. - * <p/> - * Similar code in camel-core-xml module in class org.apache.camel.core.xml.AbstractCamelContextFactoryBean - * or in camel-spring-boot module in class org.apache.camel.spring.boot.CamelAutoConfiguration. - */ - static void afterPropertiesSet(Registry registry, CamelContext camelContext) throws Exception { - final ManagementStrategy managementStrategy = camelContext.getManagementStrategy(); - - registerPropertyForBeanType(registry, BacklogTracer.class, bt -> camelContext.setExtension(BacklogTracer.class, bt)); - registerPropertyForBeanType(registry, HandleFault.class, camelContext.adapt(ExtendedCamelContext.class)::addInterceptStrategy); - registerPropertyForBeanType(registry, InflightRepository.class, camelContext::setInflightRepository); - registerPropertyForBeanType(registry, AsyncProcessorAwaitManager.class, camelContext.adapt(ExtendedCamelContext.class)::setAsyncProcessorAwaitManager); - registerPropertyForBeanType(registry, ManagementStrategy.class, camelContext::setManagementStrategy); - registerPropertyForBeanType(registry, ManagementObjectNameStrategy.class, managementStrategy::setManagementObjectNameStrategy); - registerPropertyForBeanType(registry, EventFactory.class, managementStrategy::setEventFactory); - registerPropertyForBeanType(registry, UnitOfWorkFactory.class, camelContext.adapt(ExtendedCamelContext.class)::setUnitOfWorkFactory); - registerPropertyForBeanType(registry, RuntimeEndpointRegistry.class, camelContext::setRuntimeEndpointRegistry); - - registerPropertiesForBeanTypes(registry, TypeConverters.class, camelContext.getTypeConverterRegistry()::addTypeConverters); - - final Predicate<EventNotifier> containsEventNotifier = managementStrategy.getEventNotifiers()::contains; - registerPropertiesForBeanTypesWithCondition(registry, EventNotifier.class, containsEventNotifier.negate(), managementStrategy::addEventNotifier); - - registerPropertiesForBeanTypes(registry, EndpointStrategy.class, camelContext.adapt(ExtendedCamelContext.class)::registerEndpointCallback); - - registerPropertyForBeanType(registry, ShutdownStrategy.class, camelContext::setShutdownStrategy); - - final Predicate<InterceptStrategy> containsInterceptStrategy = camelContext.adapt(ExtendedCamelContext.class).getInterceptStrategies()::contains; - registerPropertiesForBeanTypesWithCondition(registry, InterceptStrategy.class, containsInterceptStrategy.negate(), camelContext.adapt(ExtendedCamelContext.class)::addInterceptStrategy); - - final Predicate<LifecycleStrategy> containsLifecycleStrategy = camelContext.getLifecycleStrategies()::contains; - registerPropertiesForBeanTypesWithCondition(registry, LifecycleStrategy.class, containsLifecycleStrategy.negate(), camelContext::addLifecycleStrategy); - - registerPropertiesForBeanTypes(registry, CamelClusterService.class, addServiceToContext(camelContext)); - - // service registry - Map<String, ServiceRegistry> serviceRegistries = registry.findByTypeWithName(ServiceRegistry.class); - if (serviceRegistries != null && !serviceRegistries.isEmpty()) { - for (Map.Entry<String, ServiceRegistry> entry : serviceRegistries.entrySet()) { - ServiceRegistry service = entry.getValue(); - - if (service.getId() == null) { - service.setId(camelContext.getUuidGenerator().generateUuid()); - } - - LOG.info("Using ServiceRegistry with id: {} and implementation: {}", service.getId(), service); - camelContext.addService(service); - } - } - - registerPropertiesForBeanTypes(registry, RoutePolicyFactory.class, camelContext::addRoutePolicyFactory); - - // add SSL context parameters - GlobalSSLContextParametersSupplier sslContextParametersSupplier = getSingleBeanOfType(registry, GlobalSSLContextParametersSupplier.class); - if (sslContextParametersSupplier != null) { - camelContext.setSSLContextParameters(sslContextParametersSupplier.get()); - } - // Health check registry - HealthCheckRegistry healthCheckRegistry = getSingleBeanOfType(registry, HealthCheckRegistry.class); - if (healthCheckRegistry != null) { - healthCheckRegistry.setCamelContext(camelContext); - LOG.info("Using HealthCheckRegistry: {}", healthCheckRegistry); - camelContext.setExtension(HealthCheckRegistry.class, healthCheckRegistry); - } else { - healthCheckRegistry = HealthCheckRegistry.get(camelContext); - healthCheckRegistry.setCamelContext(camelContext); - } - - registerPropertiesForBeanTypes(registry, HealthCheckRepository.class, healthCheckRegistry::addRepository); - - registerPropertyForBeanType(registry, HealthCheckService.class, addServiceToContext(camelContext)); - registerPropertyForBeanType(registry, RouteController.class, camelContext::setRouteController); - registerPropertyForBeanType(registry, UuidGenerator.class, camelContext::setUuidGenerator); - - final Predicate<LogListener> containsLogListener = camelContext.adapt(ExtendedCamelContext.class).getLogListeners()::contains; - registerPropertiesForBeanTypesWithCondition(registry, LogListener.class, containsLogListener.negate(), camelContext.adapt(ExtendedCamelContext.class)::addLogListener); - - registerPropertyForBeanType(registry, ExecutorServiceManager.class, camelContext::setExecutorServiceManager); - - // set the default thread pool profile if defined - initThreadPoolProfiles(registry, camelContext); - } - - private static void initThreadPoolProfiles(Registry registry, CamelContext camelContext) { - Set<String> defaultIds = new HashSet<>(); - - // lookup and use custom profiles from the registry - Map<String, ThreadPoolProfile> profiles = registry.findByTypeWithName(ThreadPoolProfile.class); - if (profiles != null && !profiles.isEmpty()) { - for (Map.Entry<String, ThreadPoolProfile> entry : profiles.entrySet()) { - ThreadPoolProfile profile = entry.getValue(); - // do not add if already added, for instance a tracer that is also an InterceptStrategy class - if (profile.isDefaultProfile()) { - LOG.info("Using custom default ThreadPoolProfile with id: {} and implementation: {}", entry.getKey(), profile); - camelContext.getExecutorServiceManager().setDefaultThreadPoolProfile(profile); - defaultIds.add(entry.getKey()); - } else { - camelContext.getExecutorServiceManager().registerThreadPoolProfile(profile); - } - } - } - - // validate at most one is defined - if (defaultIds.size() > 1) { - throw new IllegalArgumentException("Only exactly one default ThreadPoolProfile is allowed, was " + defaultIds.size() + " ids: " + defaultIds); - } - } - - private static <T> void registerPropertyForBeanType(final Registry registry, final Class<T> beanType, final Consumer<T> propertySetter) { - T propertyBean = getSingleBeanOfType(registry, beanType); - if (propertyBean == null) { - return; - } - - LOG.info("Using custom {}: {}", beanType.getSimpleName(), propertyBean); - propertySetter.accept(propertyBean); - } - - private static <T> T getSingleBeanOfType(Registry registry, Class<T> type) { - Map<String, T> beans = registry.findByTypeWithName(type); - if (beans.size() == 1) { - return beans.values().iterator().next(); - } else { - return null; - } - } - - private static <T> void registerPropertiesForBeanTypes(final Registry registry, final Class<T> beanType, final Consumer<T> propertySetter) { - registerPropertiesForBeanTypesWithCondition(registry, beanType, b -> true, propertySetter); - } - - private static <T> void registerPropertiesForBeanTypesWithCondition(final Registry registry, final Class<T> beanType, final Predicate<T> condition, - final Consumer<T> propertySetter) { - final Map<String, T> beans = registry.findByTypeWithName(beanType); - if (!ObjectHelper.isNotEmpty(beans)) { - return; - } - - final String simpleName = beanType.getSimpleName(); - beans.forEach((name, bean) -> { - if (condition.test(bean)) { - LOG.info("Adding custom {} with id: {} and implementation: {}", simpleName, name, bean); - propertySetter.accept(bean); - } - }); - } - - private static <T> Consumer<T> addServiceToContext(final CamelContext camelContext) { - return service -> { - try { - camelContext.addService(service); - } catch (Exception e) { - throw new RuntimeException("Unable to add service to Camel context", e); - } - }; - } - protected void autoConfigurationPropertiesComponent(CamelContext camelContext) throws Exception { // load properties Properties prop = camelContext.getPropertiesComponent().loadProperties();