Repository: camel Updated Branches: refs/heads/master 9218c6637 -> 42ac1d046
CAMEL-10113: camel-spring-boot - Add support for looking up @Bean for various things like you can configure with <bean> in XML. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/42ac1d04 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/42ac1d04 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/42ac1d04 Branch: refs/heads/master Commit: 42ac1d046721213ec7df0ba5c297020803de22b5 Parents: 9218c66 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 1 15:46:24 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 1 15:46:24 2016 +0200 ---------------------------------------------------------------------- .../spring/boot/CamelAutoConfiguration.java | 195 +++++++++++++++++++ .../spring/boot/CamelEventNotifierTest.java | 97 +++++++++ 2 files changed, 292 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/42ac1d04/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java index a2a760e..bf3058b 100644 --- a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java +++ b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java @@ -18,13 +18,35 @@ package org.apache.camel.spring.boot; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; +import org.apache.camel.TypeConverters; import org.apache.camel.component.properties.PropertiesComponent; import org.apache.camel.component.properties.PropertiesParser; +import org.apache.camel.processor.interceptor.BacklogTracer; +import org.apache.camel.processor.interceptor.HandleFault; +import org.apache.camel.processor.interceptor.TraceFormatter; +import org.apache.camel.processor.interceptor.Tracer; +import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.spi.EndpointStrategy; +import org.apache.camel.spi.EventFactory; +import org.apache.camel.spi.EventNotifier; +import org.apache.camel.spi.InflightRepository; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.LifecycleStrategy; +import org.apache.camel.spi.ManagementNamingStrategy; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.RoutePolicyFactory; +import org.apache.camel.spi.RuntimeEndpointRegistry; +import org.apache.camel.spi.ShutdownStrategy; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.UnitOfWorkFactory; import org.apache.camel.spring.CamelBeanPostProcessor; import org.apache.camel.spring.SpringCamelContext; import org.apache.camel.util.ObjectHelper; @@ -99,6 +121,9 @@ public class CamelAutoConfiguration { camelContext.setPackageScanClassResolver(new FatJarPackageScanClassResolver()); + // additional advanced configuration which is not configured using CamelConfigurationProperties + afterPropertiesSet(applicationContext, camelContext); + return camelContext; } @@ -165,4 +190,174 @@ public class CamelAutoConfiguration { return processor; } + /** + * Performs additional configuration to lookup beans of Camel types to configure + * advanced configurations. + * <p/> + * Similar code in camel-core-xml module in class org.apache.camel.core.xml.AbstractCamelContextFactoryBean. + */ + void afterPropertiesSet(ApplicationContext applicationContext, CamelContext camelContext) { + Tracer tracer = getSingleBeanOfType(applicationContext, Tracer.class); + if (tracer != null) { + // use formatter if there is a TraceFormatter bean defined + TraceFormatter formatter = getSingleBeanOfType(applicationContext, TraceFormatter.class); + if (formatter != null) { + tracer.setFormatter(formatter); + } + LOG.info("Using custom Tracer: {}", tracer); + camelContext.addInterceptStrategy(tracer); + } + BacklogTracer backlogTracer = getSingleBeanOfType(applicationContext, BacklogTracer.class); + if (backlogTracer != null) { + LOG.info("Using custom BacklogTracer: {}", backlogTracer); + camelContext.addInterceptStrategy(backlogTracer); + } + HandleFault handleFault = getSingleBeanOfType(applicationContext, HandleFault.class); + if (handleFault != null) { + LOG.info("Using custom HandleFault: {}", handleFault); + camelContext.addInterceptStrategy(handleFault); + } + InflightRepository inflightRepository = getSingleBeanOfType(applicationContext, InflightRepository.class); + if (inflightRepository != null) { + LOG.info("Using custom InflightRepository: {}", inflightRepository); + camelContext.setInflightRepository(inflightRepository); + } + AsyncProcessorAwaitManager asyncProcessorAwaitManager = getSingleBeanOfType(applicationContext, AsyncProcessorAwaitManager.class); + if (asyncProcessorAwaitManager != null) { + LOG.info("Using custom AsyncProcessorAwaitManager: {}", asyncProcessorAwaitManager); + camelContext.setAsyncProcessorAwaitManager(asyncProcessorAwaitManager); + } + ManagementStrategy managementStrategy = getSingleBeanOfType(applicationContext, ManagementStrategy.class); + if (managementStrategy != null) { + LOG.info("Using custom ManagementStrategy: {}", managementStrategy); + camelContext.setManagementStrategy(managementStrategy); + } + ManagementNamingStrategy managementNamingStrategy = getSingleBeanOfType(applicationContext, ManagementNamingStrategy.class); + if (managementNamingStrategy != null) { + LOG.info("Using custom ManagementNamingStrategy: {}", managementNamingStrategy); + camelContext.getManagementStrategy().setManagementNamingStrategy(managementNamingStrategy); + } + EventFactory eventFactory = getSingleBeanOfType(applicationContext, EventFactory.class); + if (eventFactory != null) { + LOG.info("Using custom EventFactory: {}", eventFactory); + camelContext.getManagementStrategy().setEventFactory(eventFactory); + } + UnitOfWorkFactory unitOfWorkFactory = getSingleBeanOfType(applicationContext, UnitOfWorkFactory.class); + if (unitOfWorkFactory != null) { + LOG.info("Using custom UnitOfWorkFactory: {}", unitOfWorkFactory); + camelContext.setUnitOfWorkFactory(unitOfWorkFactory); + } + RuntimeEndpointRegistry runtimeEndpointRegistry = getSingleBeanOfType(applicationContext, RuntimeEndpointRegistry.class); + if (runtimeEndpointRegistry != null) { + LOG.info("Using custom RuntimeEndpointRegistry: {}", runtimeEndpointRegistry); + camelContext.setRuntimeEndpointRegistry(runtimeEndpointRegistry); + } + // custom type converters defined as <bean>s + Map<String, TypeConverters> typeConverters = applicationContext.getBeansOfType(TypeConverters.class); + if (typeConverters != null && !typeConverters.isEmpty()) { + for (Map.Entry<String, TypeConverters> entry : typeConverters.entrySet()) { + TypeConverters converter = entry.getValue(); + LOG.info("Adding custom TypeConverters with id: {} and implementation: {}", entry.getKey(), converter); + camelContext.getTypeConverterRegistry().addTypeConverters(converter); + } + } + // set the event notifier strategies if defined + Map<String, EventNotifier> eventNotifiers = applicationContext.getBeansOfType(EventNotifier.class); + if (eventNotifiers != null && !eventNotifiers.isEmpty()) { + for (Map.Entry<String, EventNotifier> entry : eventNotifiers.entrySet()) { + EventNotifier notifier = entry.getValue(); + // do not add if already added, for instance a tracer that is also an InterceptStrategy class + if (!camelContext.getManagementStrategy().getEventNotifiers().contains(notifier)) { + LOG.info("Using custom EventNotifier with id: {} and implementation: {}", entry.getKey(), notifier); + camelContext.getManagementStrategy().addEventNotifier(notifier); + } + } + } + // set endpoint strategies if defined + Map<String, EndpointStrategy> endpointStrategies = applicationContext.getBeansOfType(EndpointStrategy.class); + if (endpointStrategies != null && !endpointStrategies.isEmpty()) { + for (Map.Entry<String, EndpointStrategy> entry : endpointStrategies.entrySet()) { + EndpointStrategy strategy = entry.getValue(); + LOG.info("Using custom EndpointStrategy with id: {} and implementation: {}", entry.getKey(), strategy); + camelContext.addRegisterEndpointCallback(strategy); + } + } + // shutdown + ShutdownStrategy shutdownStrategy = getSingleBeanOfType(applicationContext, ShutdownStrategy.class); + if (shutdownStrategy != null) { + LOG.info("Using custom ShutdownStrategy: " + shutdownStrategy); + camelContext.setShutdownStrategy(shutdownStrategy); + } + // add global interceptors + Map<String, InterceptStrategy> interceptStrategies = applicationContext.getBeansOfType(InterceptStrategy.class); + if (interceptStrategies != null && !interceptStrategies.isEmpty()) { + for (Map.Entry<String, InterceptStrategy> entry : interceptStrategies.entrySet()) { + InterceptStrategy strategy = entry.getValue(); + // do not add if already added, for instance a tracer that is also an InterceptStrategy class + if (!camelContext.getInterceptStrategies().contains(strategy)) { + LOG.info("Using custom InterceptStrategy with id: {} and implementation: {}", entry.getKey(), strategy); + camelContext.addInterceptStrategy(strategy); + } + } + } + // set the lifecycle strategy if defined + Map<String, LifecycleStrategy> lifecycleStrategies = applicationContext.getBeansOfType(LifecycleStrategy.class); + if (lifecycleStrategies != null && !lifecycleStrategies.isEmpty()) { + for (Map.Entry<String, LifecycleStrategy> entry : lifecycleStrategies.entrySet()) { + LifecycleStrategy strategy = entry.getValue(); + // do not add if already added, for instance a tracer that is also an InterceptStrategy class + if (!camelContext.getLifecycleStrategies().contains(strategy)) { + LOG.info("Using custom LifecycleStrategy with id: {} and implementation: {}", entry.getKey(), strategy); + camelContext.addLifecycleStrategy(strategy); + } + } + } + // add route policy factories + Map<String, RoutePolicyFactory> routePolicyFactories = applicationContext.getBeansOfType(RoutePolicyFactory.class); + if (routePolicyFactories != null && !routePolicyFactories.isEmpty()) { + for (Map.Entry<String, RoutePolicyFactory> entry : routePolicyFactories.entrySet()) { + RoutePolicyFactory factory = entry.getValue(); + LOG.info("Using custom RoutePolicyFactory with id: {} and implementation: {}", entry.getKey(), factory); + camelContext.addRoutePolicyFactory(factory); + } + } + + // set the default thread pool profile if defined + initThreadPoolProfiles(applicationContext, camelContext); + } + + private void initThreadPoolProfiles(ApplicationContext applicationContext, CamelContext camelContext) { + Set<String> defaultIds = new HashSet<String>(); + + // lookup and use custom profiles from the registry + Map<String, ThreadPoolProfile> profiles = applicationContext.getBeansOfType(ThreadPoolProfile.class); + if (profiles != null && !profiles.isEmpty()) { + for (Map.Entry<String, ThreadPoolProfile> entry : profiles.entrySet()) { + ThreadPoolProfile profile = entry.getValue(); + // do not add if already added, for instance a tracer that is also an InterceptStrategy class + if (profile.isDefaultProfile()) { + LOG.info("Using custom default ThreadPoolProfile with id: {} and implementation: {}", entry.getKey(), profile); + camelContext.getExecutorServiceManager().setDefaultThreadPoolProfile(profile); + defaultIds.add(entry.getKey()); + } else { + camelContext.getExecutorServiceManager().registerThreadPoolProfile(profile); + } + } + } + + // validate at most one is defined + if (defaultIds.size() > 1) { + throw new IllegalArgumentException("Only exactly one default ThreadPoolProfile is allowed, was " + defaultIds.size() + " ids: " + defaultIds); + } + } + + private <T> T getSingleBeanOfType(ApplicationContext applicationContext, Class<T> type) { + Map<String, T> beans = applicationContext.getBeansOfType(type); + if (beans.size() == 1) { + return beans.values().iterator().next(); + } else { + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/42ac1d04/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/CamelEventNotifierTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/CamelEventNotifierTest.java b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/CamelEventNotifierTest.java new file mode 100644 index 0000000..1c313d4 --- /dev/null +++ b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/CamelEventNotifierTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.boot; + +import java.util.EventObject; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.EventNotifier; +import org.apache.camel.support.EventNotifierSupport; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.SpringApplicationConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@EnableAutoConfiguration +@SpringApplicationConfiguration(classes = CamelEventNotifierTest.class) +public class CamelEventNotifierTest extends Assert { + + @Autowired + CamelContext camelContext; + + @Autowired + ProducerTemplate producerTemplate; + + @Bean + RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("mock:result"); + } + }; + } + + @Test + public void testEventNotifier() throws InterruptedException { + MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:result", MockEndpoint.class); + mockEndpoint.expectedMessageCount(1); + + producerTemplate.sendBody("direct:start", "Hello World"); + + mockEndpoint.assertIsSatisfied(); + + MyEventNotifier notifier = (MyEventNotifier) camelContext.getManagementStrategy().getEventNotifiers().get(0); + assertNotNull(notifier); + assertTrue(notifier.getCount() > 0); + } + + @Bean + public EventNotifier myEventNotifier() { + return new MyEventNotifier(); + } + + private class MyEventNotifier extends EventNotifierSupport { + + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public void notify(EventObject event) throws Exception { + counter.incrementAndGet(); + } + + @Override + public boolean isEnabled(EventObject event) { + return true; + } + + public int getCount() { + return counter.get(); + } + + } + +}