This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch bean in repository https://gitbox.apache.org/repos/asf/camel.git
commit 129cb0e23b26daf0c19859ef7a9053d7b8c81d89 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 25 09:13:37 2019 +0200 CAMEL-13449: camel3 - Move bean component out of camel-core --- .../main/java/org/apache/camel/CamelContext.java | 6 ++ .../camel/spi/AnnotationBasedProcessorFactory.java | 45 +++++++++++ .../apache/camel/component/bean/MethodInfo.java | 50 ++----------- .../apache/camel/impl/AbstractCamelContext.java | 8 +- .../DefaultAnnotationBasedProcessorFactory.java | 86 ++++++++++++++++++++++ .../DefaultDeferServiceFactory.java | 20 +---- 6 files changed, 152 insertions(+), 63 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index abc2029..e6b4a66 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import org.apache.camel.spi.AnnotationBasedProcessorFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelBeanPostProcessor; import org.apache.camel.spi.CamelContextNameStrategy; @@ -1479,4 +1480,9 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { */ DeferServiceFactory getDeferServiceFactory(); + /** + * Gets the {@link AnnotationBasedProcessorFactory} to use. + */ + AnnotationBasedProcessorFactory getAnnotationBasedProcessorFactory(); + } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/AnnotationBasedProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/AnnotationBasedProcessorFactory.java new file mode 100644 index 0000000..856d7a0 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/AnnotationBasedProcessorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.CamelContext; +import org.apache.camel.DynamicRouter; +import org.apache.camel.Processor; +import org.apache.camel.RecipientList; +import org.apache.camel.RoutingSlip; + +/** + * Factory to create {@link Processor} for annotation based EIPs. + */ +public interface AnnotationBasedProcessorFactory { + + /** + * Creates dynamic router processor from the configured annotation. + */ + Processor createDynamicRouter(CamelContext camelContext, DynamicRouter annotation); + + /** + * Creates recipient list processor from the configured annotation. + */ + Processor createRecipientList(CamelContext camelContext, RecipientList annotation); + + /** + * Creates routing slip processor from the configured annotation. + */ + Processor createRoutingSlip(CamelContext camelContext, RoutingSlip annotation); + +} diff --git a/core/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/core/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java index 6b9b0b0..317acdf 100644 --- a/core/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java +++ b/core/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java @@ -30,9 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutorService; -import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -42,15 +40,13 @@ import org.apache.camel.ExpressionEvaluationException; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Pattern; -import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.StreamCache; -// TODO: Factory SPI to create these processors which are loaded via FactoryFinder +// TODO: avoid these imports import org.apache.camel.processor.DynamicRouter; import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.RoutingSlip; -import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.ExpressionAdapter; @@ -63,6 +59,7 @@ import org.slf4j.LoggerFactory; import static org.apache.camel.util.ObjectHelper.asString; + /** * Information about a method to be used for invocation. */ @@ -130,11 +127,7 @@ public class MethodInfo { org.apache.camel.RoutingSlip routingSlipAnnotation = (org.apache.camel.RoutingSlip)collectedMethodAnnotation.get(org.apache.camel.RoutingSlip.class); if (routingSlipAnnotation != null && matchContext(routingSlipAnnotation.context())) { - routingSlip = new RoutingSlip(camelContext); - routingSlip.setDelimiter(routingSlipAnnotation.delimiter()); - routingSlip.setIgnoreInvalidEndpoints(routingSlipAnnotation.ignoreInvalidEndpoints()); - routingSlip.setCacheSize(routingSlipAnnotation.cacheSize()); - + routingSlip = (RoutingSlip) camelContext.getAnnotationBasedProcessorFactory().createRoutingSlip(camelContext, routingSlipAnnotation); // add created routingSlip as a service so we have its lifecycle managed try { camelContext.addService(routingSlip); @@ -147,10 +140,7 @@ public class MethodInfo { (org.apache.camel.DynamicRouter)collectedMethodAnnotation.get(org.apache.camel.DynamicRouter.class); if (dynamicRouterAnnotation != null && matchContext(dynamicRouterAnnotation.context())) { - dynamicRouter = new DynamicRouter(camelContext); - dynamicRouter.setDelimiter(dynamicRouterAnnotation.delimiter()); - dynamicRouter.setIgnoreInvalidEndpoints(dynamicRouterAnnotation.ignoreInvalidEndpoints()); - dynamicRouter.setCacheSize(dynamicRouterAnnotation.cacheSize()); + dynamicRouter = (DynamicRouter) camelContext.getAnnotationBasedProcessorFactory().createDynamicRouter(camelContext, dynamicRouterAnnotation); // add created dynamicRouter as a service so we have its lifecycle managed try { camelContext.addService(dynamicRouter); @@ -163,37 +153,7 @@ public class MethodInfo { (org.apache.camel.RecipientList)collectedMethodAnnotation.get(org.apache.camel.RecipientList.class); if (recipientListAnnotation != null && matchContext(recipientListAnnotation.context())) { - recipientList = new RecipientList(camelContext, recipientListAnnotation.delimiter()); - recipientList.setStopOnException(recipientListAnnotation.stopOnException()); - recipientList.setStopOnAggregateException(recipientListAnnotation.stopOnAggregateException()); - recipientList.setIgnoreInvalidEndpoints(recipientListAnnotation.ignoreInvalidEndpoints()); - recipientList.setParallelProcessing(recipientListAnnotation.parallelProcessing()); - recipientList.setParallelAggregate(recipientListAnnotation.parallelAggregate()); - recipientList.setStreaming(recipientListAnnotation.streaming()); - recipientList.setTimeout(recipientListAnnotation.timeout()); - recipientList.setShareUnitOfWork(recipientListAnnotation.shareUnitOfWork()); - - if (org.apache.camel.util.ObjectHelper.isNotEmpty(recipientListAnnotation.executorServiceRef())) { - ExecutorService executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, recipientListAnnotation.executorServiceRef()); - recipientList.setExecutorService(executor); - } - - if (recipientListAnnotation.parallelProcessing() && recipientList.getExecutorService() == null) { - // we are running in parallel so we need a thread pool - ExecutorService executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "@RecipientList"); - recipientList.setExecutorService(executor); - } - - if (org.apache.camel.util.ObjectHelper.isNotEmpty(recipientListAnnotation.strategyRef())) { - AggregationStrategy strategy = CamelContextHelper.mandatoryLookup(camelContext, recipientListAnnotation.strategyRef(), AggregationStrategy.class); - recipientList.setAggregationStrategy(strategy); - } - - if (org.apache.camel.util.ObjectHelper.isNotEmpty(recipientListAnnotation.onPrepareRef())) { - Processor onPrepare = CamelContextHelper.mandatoryLookup(camelContext, recipientListAnnotation.onPrepareRef(), Processor.class); - recipientList.setOnPrepare(onPrepare); - } - + recipientList = (RecipientList) camelContext.getAnnotationBasedProcessorFactory().createRecipientList(camelContext, recipientListAnnotation); // add created recipientList as a service so we have its lifecycle managed try { camelContext.addService(recipientList); diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java index c8813b7..98e84bb 100644 --- a/core/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java +++ b/core/camel-core/src/main/java/org/apache/camel/impl/AbstractCamelContext.java @@ -98,12 +98,12 @@ import org.apache.camel.model.rest.RestDefinition; import org.apache.camel.model.rest.RestsDefinition; import org.apache.camel.model.transformer.TransformerDefinition; import org.apache.camel.model.validator.ValidatorDefinition; -import org.apache.camel.processor.DefaultDeferServiceFactory; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.interceptor.Debug; import org.apache.camel.processor.interceptor.HandleFault; import org.apache.camel.reifier.RouteReifier; import org.apache.camel.runtimecatalog.RuntimeCamelCatalog; +import org.apache.camel.spi.AnnotationBasedProcessorFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelBeanPostProcessor; import org.apache.camel.spi.CamelContextNameStrategy; @@ -275,6 +275,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod private volatile RouteController routeController; private volatile ScheduledExecutorService errorHandlerExecutorService; private final DeferServiceFactory deferServiceFactory = new DefaultDeferServiceFactory(); + private final AnnotationBasedProcessorFactory annotationBasedProcessorFactory = new DefaultAnnotationBasedProcessorFactory(); private TransformerRegistry<TransformerKey> transformerRegistry; private ValidatorRegistry<ValidatorKey> validatorRegistry; @@ -3930,6 +3931,11 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Mod return deferServiceFactory; } + @Override + public AnnotationBasedProcessorFactory getAnnotationBasedProcessorFactory() { + return annotationBasedProcessorFactory; + } + protected Map<String, RouteService> getRouteServices() { return routeServices; } diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultAnnotationBasedProcessorFactory.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultAnnotationBasedProcessorFactory.java new file mode 100644 index 0000000..512da98 --- /dev/null +++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultAnnotationBasedProcessorFactory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.concurrent.ExecutorService; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.CamelContext; +import org.apache.camel.DynamicRouter; +import org.apache.camel.Processor; +import org.apache.camel.RecipientList; +import org.apache.camel.RoutingSlip; +import org.apache.camel.spi.AnnotationBasedProcessorFactory; +import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.util.ObjectHelper; + +public final class DefaultAnnotationBasedProcessorFactory implements AnnotationBasedProcessorFactory { + + @Override + public Processor createDynamicRouter(CamelContext camelContext, DynamicRouter annotation) { + org.apache.camel.processor.DynamicRouter dynamicRouter = new org.apache.camel.processor.DynamicRouter(camelContext); + dynamicRouter.setDelimiter(annotation.delimiter()); + dynamicRouter.setIgnoreInvalidEndpoints(annotation.ignoreInvalidEndpoints()); + dynamicRouter.setCacheSize(annotation.cacheSize()); + return dynamicRouter; + } + + @Override + public Processor createRecipientList(CamelContext camelContext, RecipientList annotation) { + org.apache.camel.processor.RecipientList recipientList = new org.apache.camel.processor.RecipientList(camelContext, annotation.delimiter()); + recipientList.setStopOnException(annotation.stopOnException()); + recipientList.setStopOnAggregateException(annotation.stopOnAggregateException()); + recipientList.setIgnoreInvalidEndpoints(annotation.ignoreInvalidEndpoints()); + recipientList.setParallelProcessing(annotation.parallelProcessing()); + recipientList.setParallelAggregate(annotation.parallelAggregate()); + recipientList.setStreaming(annotation.streaming()); + recipientList.setTimeout(annotation.timeout()); + recipientList.setShareUnitOfWork(annotation.shareUnitOfWork()); + + if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) { + ExecutorService executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, annotation.executorServiceRef()); + recipientList.setExecutorService(executor); + } + + if (annotation.parallelProcessing() && recipientList.getExecutorService() == null) { + // we are running in parallel so we need a thread pool + ExecutorService executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "@RecipientList"); + recipientList.setExecutorService(executor); + } + + if (ObjectHelper.isNotEmpty(annotation.strategyRef())) { + AggregationStrategy strategy = CamelContextHelper.mandatoryLookup(camelContext, annotation.strategyRef(), AggregationStrategy.class); + recipientList.setAggregationStrategy(strategy); + } + + if (ObjectHelper.isNotEmpty(annotation.onPrepareRef())) { + Processor onPrepare = CamelContextHelper.mandatoryLookup(camelContext, annotation.onPrepareRef(), Processor.class); + recipientList.setOnPrepare(onPrepare); + } + + return recipientList; + } + + @Override + public Processor createRoutingSlip(CamelContext camelContext, RoutingSlip annotation) { + org.apache.camel.processor.RoutingSlip routingSlip = new org.apache.camel.processor.RoutingSlip(camelContext); + routingSlip.setDelimiter(annotation.delimiter()); + routingSlip.setIgnoreInvalidEndpoints(annotation.ignoreInvalidEndpoints()); + routingSlip.setCacheSize(annotation.cacheSize()); + return routingSlip; + } +} diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/DefaultDeferServiceFactory.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultDeferServiceFactory.java similarity index 60% rename from core/camel-core/src/main/java/org/apache/camel/processor/DefaultDeferServiceFactory.java rename to core/camel-core/src/main/java/org/apache/camel/impl/DefaultDeferServiceFactory.java index 7145d63..34207df 100644 --- a/core/camel-core/src/main/java/org/apache/camel/processor/DefaultDeferServiceFactory.java +++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultDeferServiceFactory.java @@ -14,30 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.processor; +package org.apache.camel.impl; import org.apache.camel.Endpoint; import org.apache.camel.Producer; -import org.apache.camel.impl.DeferProducer; +import org.apache.camel.processor.EventNotifierProducer; +import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.spi.DeferServiceFactory; -/** - * Factory to create services such as {@link Producer}s - * and defer starting the created service, until {@link org.apache.camel.CamelContext} has been started. - */ public final class DefaultDeferServiceFactory implements DeferServiceFactory { - /** - * Creates the {@link Producer} which is deferred started until {@link org.apache.camel.CamelContext} is being started. - * <p/> - * When the producer is started, it re-lookup the endpoint to capture any changes such as the endpoint has been intercepted. - * This allows the producer to react and send messages to the updated endpoint. - * - * @param endpoint the endpoint - * @return the producer which will be deferred started until {@link org.apache.camel.CamelContext} has been started - * @throws Exception can be thrown if there is an error starting the producer - * @see org.apache.camel.impl.DeferProducer - */ @Override public Producer createProducer(Endpoint endpoint) throws Exception { Producer producer = new DeferProducer(endpoint);