This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit bfe9336ce5a9c2c7ba3728cda74136ddc8432aa7 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue May 28 23:17:24 2019 +0200 [CAMEL-13564] Move DefaultChannel to camel-base --- .../src/main/java/org/apache/camel/Channel.java | 45 ++----- .../camel/processor/channel/DefaultChannel.java | 136 ++++++++------------- .../java/org/apache/camel/model/ModelChannel.java | 56 --------- .../org/apache/camel/reifier/ProcessorReifier.java | 69 +++++++---- .../RandomLoadBalanceJavaDSLBuilderTest.java | 2 +- 5 files changed, 105 insertions(+), 203 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/Channel.java b/core/camel-api/src/main/java/org/apache/camel/Channel.java index 5ffe260..0a73529 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Channel.java +++ b/core/camel-api/src/main/java/org/apache/camel/Channel.java @@ -29,20 +29,6 @@ import org.apache.camel.spi.RouteContext; public interface Channel extends AsyncProcessor, Navigate<Processor> { /** - * Sets the processor that the channel should route the {@link Exchange} to. - * - * @param next the next processor - */ - void setNextProcessor(Processor next); - - /** - * Sets the {@link org.apache.camel.processor.ErrorHandler} this Channel uses. - * - * @param errorHandler the error handler - */ - void setErrorHandler(Processor errorHandler); - - /** * Gets the {@link org.apache.camel.processor.ErrorHandler} this Channel uses. * * @return the error handler, or <tt>null</tt> if no error handler is used. @@ -50,29 +36,6 @@ public interface Channel extends AsyncProcessor, Navigate<Processor> { Processor getErrorHandler(); /** - * Adds a {@link org.apache.camel.spi.InterceptStrategy} to apply each {@link Exchange} before - * its routed to the next {@link Processor}. - * - * @param strategy the intercept strategy - */ - void addInterceptStrategy(InterceptStrategy strategy); - - /** - * Adds a list of {@link org.apache.camel.spi.InterceptStrategy} to apply each {@link Exchange} before - * its routed to the next {@link Processor}. - * - * @param strategy list of strategies - */ - void addInterceptStrategies(List<InterceptStrategy> strategy); - - /** - * Gets the list of {@link org.apache.camel.spi.InterceptStrategy} registered to this Channel. - * - * @return list of strategies, returns an empty list if no strategies is registered. - */ - List<InterceptStrategy> getInterceptStrategies(); - - /** * Gets the wrapped output that at runtime should be delegated to. * * @return the output to route the {@link Exchange} to @@ -99,4 +62,12 @@ public interface Channel extends AsyncProcessor, Navigate<Processor> { * @return the route context */ RouteContext getRouteContext(); + + /** + * Gets the definition of the next processor + * + * @return the processor definition + */ + NamedNode getProcessorDefinition(); + } diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java similarity index 73% rename from core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java rename to core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java index bf77f86..d519a9b 100644 --- a/core/camel-core/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java @@ -26,20 +26,13 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Channel; import org.apache.camel.Exchange; +import org.apache.camel.NamedNode; import org.apache.camel.Processor; -import org.apache.camel.model.ModelChannel; -import org.apache.camel.model.OnCompletionDefinition; -import org.apache.camel.model.OnExceptionDefinition; -import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; -import org.apache.camel.model.RouteDefinition; -import org.apache.camel.model.RouteDefinitionHelper; import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.WrapProcessor; import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler; import org.apache.camel.processor.interceptor.BacklogDebugger; import org.apache.camel.processor.interceptor.BacklogTracer; -import org.apache.camel.spi.CamelInternalProcessorAdvice; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.ManagementInterceptStrategy; import org.apache.camel.spi.MessageHistoryFactory; @@ -57,23 +50,18 @@ import org.apache.camel.support.service.ServiceHelper; * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node * in the graph. */ -public class DefaultChannel extends CamelInternalProcessor implements ModelChannel { +public class DefaultChannel extends CamelInternalProcessor implements Channel { - private final List<InterceptStrategy> interceptors = new ArrayList<>(); private Processor errorHandler; // the next processor (non wrapped) private Processor nextProcessor; // the real output to invoke that has been wrapped private Processor output; - private ProcessorDefinition<?> definition; - private ProcessorDefinition<?> childDefinition; + private NamedNode definition; private ManagementInterceptStrategy.InstrumentationProcessor<?> instrumentationProcessor; private CamelContext camelContext; private RouteContext routeContext; - - public void setNextProcessor(Processor next) { - this.nextProcessor = next; - } + private boolean routeScoped = true; public Processor getOutput() { // the errorHandler is already decorated with interceptors @@ -107,15 +95,11 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann return nextProcessor; } - public boolean hasInterceptorStrategy(Class<?> type) { - for (InterceptStrategy strategy : interceptors) { - if (type.isInstance(strategy)) { - return true; - } - } - return false; - } - + /** + * Sets the {@link org.apache.camel.processor.ErrorHandler} this Channel uses. + * + * @param errorHandler the error handler + */ public void setErrorHandler(Processor errorHandler) { this.errorHandler = errorHandler; } @@ -124,26 +108,10 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann return errorHandler; } - public void addInterceptStrategy(InterceptStrategy strategy) { - interceptors.add(strategy); - } - - public void addInterceptStrategies(List<InterceptStrategy> strategies) { - interceptors.addAll(strategies); - } - - public List<InterceptStrategy> getInterceptStrategies() { - return interceptors; - } - - public ProcessorDefinition<?> getProcessorDefinition() { + public NamedNode getProcessorDefinition() { return definition; } - public void setChildDefinition(ProcessorDefinition<?> childDefinition) { - this.childDefinition = childDefinition; - } - public RouteContext getRouteContext() { return routeContext; } @@ -157,7 +125,7 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann @Override protected void doStop() throws Exception { - if (!isContextScoped()) { + if (isRouteScoped()) { // only stop services if not context scoped (as context scoped is reused by others) ServiceHelper.stopService(output, errorHandler); } @@ -168,68 +136,61 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann ServiceHelper.stopAndShutdownServices(output, errorHandler); } - private boolean isContextScoped() { - if (definition instanceof OnExceptionDefinition) { - return !((OnExceptionDefinition) definition).isRouteScoped(); - } else if (definition instanceof OnCompletionDefinition) { - return !((OnCompletionDefinition) definition).isRouteScoped(); - } - - return false; + public boolean isRouteScoped() { + return routeScoped; } - public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { + /** + * Initializes the channel. + * If the initialized output definition contained outputs (children) then + * the childDefinition will be set so we can leverage fine grained tracing + * + * @param routeContext the route context + * @param definition the route definition the {@link Channel} represents + * @param childDefinition the child definition + * @throws Exception is thrown if some error occurred + */ + public void initChannel(RouteContext routeContext, + NamedNode definition, + NamedNode childDefinition, + List<InterceptStrategy> interceptors, + Processor nextProcessor, + NamedNode route, + boolean first, + boolean routeScoped) throws Exception { this.routeContext = routeContext; - this.definition = outputDefinition; + this.definition = definition; this.camelContext = routeContext.getCamelContext(); + this.nextProcessor = nextProcessor; + this.routeScoped = routeScoped; - Processor target = nextProcessor; - Processor next; - - // init CamelContextAware as early as possible on target - if (target instanceof CamelContextAware) { - ((CamelContextAware) target).setCamelContext(camelContext); + // init CamelContextAware as early as possible on nextProcessor + if (nextProcessor instanceof CamelContextAware) { + ((CamelContextAware) nextProcessor).setCamelContext(camelContext); } // the definition to wrap should be the fine grained, // so if a child is set then use it, if not then its the original output used - ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition; + NamedNode targetOutputDef = childDefinition != null ? childDefinition : definition; log.debug("Initialize channel for target: '{}'", targetOutputDef); - // fix parent/child relationship. This will be the case of the routes has been - // defined using XML DSL or end user may have manually assembled a route from the model. - // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders) - // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors - // then we need to fix the parent/child relationship beforehand, and thus we can do it here - // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0) - if (childDefinition != null && outputDefinition != childDefinition) { - childDefinition.setParent(outputDefinition); - } - - // force the creation of an id - RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition); - // setup instrumentation processor for management (jmx) // this is later used in postInitChannel as we need to setup the error handler later as well ManagementInterceptStrategy managed = routeContext.getManagementInterceptStrategy(); if (managed != null) { - instrumentationProcessor = managed.createProcessor(targetOutputDef, target); + instrumentationProcessor = managed.createProcessor(targetOutputDef, nextProcessor); } - // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to trace the backlog) + // then wrap the output with the backlog and tracer (backlog first, + // as we do not want regular tracer to trace the backlog) BacklogTracer tracer = getOrCreateBacklogTracer(); camelContext.setExtension(BacklogTracer.class, tracer); - RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); - boolean first = false; - if (route != null && !route.getOutputs().isEmpty()) { - first = route.getOutputs().get(0) == definition; - } addAdvice(new BacklogTracerAdvice(tracer, targetOutputDef, route, first)); // add debugger as well so we have both tracing and debugging out of the box BacklogDebugger debugger = getOrCreateBacklogDebugger(); camelContext.addService(debugger); - addAdvice(new BacklogDebuggerAdvice(debugger, target, targetOutputDef)); + addAdvice(new BacklogDebuggerAdvice(debugger, nextProcessor, targetOutputDef)); if (routeContext.isMessageHistory()) { // add message history advice @@ -242,12 +203,13 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann // reverse list so the first will be wrapped last, as it would then be first being invoked Collections.reverse(interceptors); // wrap the output with the configured interceptors + Processor target = nextProcessor; for (InterceptStrategy strategy : interceptors) { - next = target == nextProcessor ? null : nextProcessor; + Processor next = target == nextProcessor ? null : nextProcessor; // use the fine grained definition (eg the child if available). Its always possible to get back to the parent Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); if (!(wrapped instanceof AsyncProcessor)) { - log.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance." + log.warn("Interceptor: " + strategy + " at: " + definition + " does not return an AsyncProcessor instance." + " This causes the asynchronous routing engine to not work as optimal as possible." + " See more details at the InterceptStrategy javadoc." + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine," @@ -272,8 +234,12 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann output = target; } - @Override - public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { + /** + * Post initializes the channel. + * + * @throws Exception is thrown if some error occurred + */ + public void postInitChannel() throws Exception { // if jmx was enabled for the processor then either add as advice or wrap and change the processor // on the error handler. See more details in the class javadoc of InstrumentationProcessor if (instrumentationProcessor != null) { diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java b/core/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java deleted file mode 100644 index c655f60..0000000 --- a/core/camel-core/src/main/java/org/apache/camel/model/ModelChannel.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.model; - -import org.apache.camel.Channel; -import org.apache.camel.spi.RouteContext; - -public interface ModelChannel extends Channel { - - /** - * Initializes the channel. - * - * @param outputDefinition the route definition the {@link Channel} represents - * @param routeContext the route context - * @throws Exception is thrown if some error occurred - */ - void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception; - - /** - * Post initializes the channel. - * - * @param outputDefinition the route definition the {@link Channel} represents - * @param routeContext the route context - * @throws Exception is thrown if some error occurred - */ - void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception; - - /** - * If the initialized output definition contained outputs (children) then we need to - * set the child so we can leverage fine grained tracing - * - * @param child the child - */ - void setChildDefinition(ProcessorDefinition<?> child); - - /** - * Gets the definition of the next processor - * - * @return the processor definition - */ - ProcessorDefinition<?> getProcessorDefinition(); -} diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index f689433..9d18764 100644 --- a/core/camel-core/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -50,7 +50,6 @@ import org.apache.camel.model.LoadBalanceDefinition; import org.apache.camel.model.LogDefinition; import org.apache.camel.model.LoopDefinition; import org.apache.camel.model.MarshalDefinition; -import org.apache.camel.model.ModelChannel; import org.apache.camel.model.MulticastDefinition; import org.apache.camel.model.OnCompletionDefinition; import org.apache.camel.model.OnExceptionDefinition; @@ -71,6 +70,7 @@ import org.apache.camel.model.RemovePropertyDefinition; import org.apache.camel.model.ResequenceDefinition; import org.apache.camel.model.RollbackDefinition; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.RouteDefinitionHelper; import org.apache.camel.model.RoutingSlipDefinition; import org.apache.camel.model.SagaDefinition; import org.apache.camel.model.SamplingDefinition; @@ -107,7 +107,6 @@ import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; -import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.RouteContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,7 +266,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { * Wraps the child processor in whatever necessary interceptors and error handlers */ public Channel wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { - // dont double wrap + // don't double wrap if (processor instanceof Channel) { return (Channel) processor; } @@ -280,18 +279,44 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { protected Channel wrapChannel(RouteContext routeContext, Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler) throws Exception { // put a channel in between this and each output to control the route flow logic - ModelChannel channel = createChannel(routeContext); - channel.setNextProcessor(processor); + DefaultChannel channel = new DefaultChannel(); // add interceptor strategies to the channel must be in this order: camel context, route context, local - addInterceptStrategies(routeContext, channel, routeContext.getCamelContext().adapt(ExtendedCamelContext.class).getInterceptStrategies()); - addInterceptStrategies(routeContext, channel, routeContext.getInterceptStrategies()); - addInterceptStrategies(routeContext, channel, definition.getInterceptStrategies()); + List<InterceptStrategy> interceptors = new ArrayList<>(); + addInterceptStrategies(routeContext, interceptors, routeContext.getCamelContext().adapt(ExtendedCamelContext.class).getInterceptStrategies()); + addInterceptStrategies(routeContext, interceptors, routeContext.getInterceptStrategies()); + addInterceptStrategies(routeContext, interceptors, definition.getInterceptStrategies()); + + // force the creation of an id + RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition); + + // fix parent/child relationship. This will be the case of the routes has been + // defined using XML DSL or end user may have manually assembled a route from the model. + // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders) + // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors + // then we need to fix the parent/child relationship beforehand, and thus we can do it here + // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0) + if (child != null && definition != child) { + child.setParent(definition); + } // set the child before init the channel - channel.setChildDefinition(child); - channel.initChannel(definition, routeContext); + RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); + boolean first = false; + if (route != null && !route.getOutputs().isEmpty()) { + first = route.getOutputs().get(0) == definition; + } + // set scoping + boolean routeScoped = true; + if (definition instanceof OnExceptionDefinition) { + routeScoped = ((OnExceptionDefinition) definition).isRouteScoped(); + } else if (this.definition instanceof OnCompletionDefinition) { + routeScoped = ((OnCompletionDefinition) definition).isRouteScoped(); + } + // initialize the channel + channel.initChannel(routeContext, definition, child, interceptors, processor, route, first, routeScoped); + boolean wrap = false; // set the error handler, must be done after init as we can set the error handler as first in the chain if (definition instanceof TryDefinition || definition instanceof CatchDefinition || definition instanceof FinallyDefinition) { // do not use error handler for try .. catch .. finally blocks as it will handle errors itself @@ -310,7 +335,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { // however if inherit error handler is enabled, we need to wrap an error handler on the hystrix parent if (inheritErrorHandler != null && inheritErrorHandler && child == null) { // only wrap the parent (not the children of the hystrix) - wrapChannelInErrorHandler(channel, routeContext, inheritErrorHandler); + wrap = true; } else { log.trace("{} is part of HystrixCircuitBreaker so no error handler is applied", definition); } @@ -321,17 +346,20 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { boolean isShareUnitOfWork = def.getShareUnitOfWork() != null && def.getShareUnitOfWork(); if (isShareUnitOfWork && child == null) { // only wrap the parent (not the children of the multicast) - wrapChannelInErrorHandler(channel, routeContext, inheritErrorHandler); + wrap = true; } else { log.trace("{} is part of multicast which have special error handling so no error handler is applied", definition); } } else { // use error handler by default or if configured to do so + wrap = true; + } + if (wrap) { wrapChannelInErrorHandler(channel, routeContext, inheritErrorHandler); } // do post init at the end - channel.postInitChannel(definition, routeContext); + channel.postInitChannel(); log.trace("{} wrapped in Channel: {}", definition, channel); return channel; @@ -345,7 +373,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { * @param inheritErrorHandler whether to inherit error handler * @throws Exception can be thrown if failed to create error handler builder */ - private void wrapChannelInErrorHandler(Channel channel, RouteContext routeContext, Boolean inheritErrorHandler) throws Exception { + private void wrapChannelInErrorHandler(DefaultChannel channel, RouteContext routeContext, Boolean inheritErrorHandler) throws Exception { if (inheritErrorHandler == null || inheritErrorHandler) { log.trace("{} is configured to inheritErrorHandler", definition); Processor output = channel.getOutput(); @@ -382,10 +410,10 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { * Adds the given list of interceptors to the channel. * * @param routeContext the route context - * @param channel the channel to add strategies + * @param interceptors the list to add strategies * @param strategies list of strategies to add. */ - protected void addInterceptStrategies(RouteContext routeContext, Channel channel, List<InterceptStrategy> strategies) { + protected void addInterceptStrategies(RouteContext routeContext, List<InterceptStrategy> interceptors, List<InterceptStrategy> strategies) { for (InterceptStrategy strategy : strategies) { if (!routeContext.isHandleFault() && strategy instanceof HandleFault) { // handle fault is disabled so we should not add it @@ -393,7 +421,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { } // add strategy - channel.addInterceptStrategy(strategy); + interceptors.add(strategy); } } @@ -405,13 +433,6 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> { return Pipeline.newInstance(routeContext.getCamelContext(), list); } - /** - * Creates a new instance of the {@link Channel}. - */ - protected ModelChannel createChannel(RouteContext routeContext) throws Exception { - return new DefaultChannel(); - } - protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition<?>> outputs) throws Exception { // We will save list of actions to restore the outputs back to the original state. Runnable propertyPlaceholdersChangeReverter = ProcessorDefinitionHelper.createPropertyPlaceholdersChangeReverter(); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java index afb71c6..c3bd4b1 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RandomLoadBalanceJavaDSLBuilderTest.java @@ -79,7 +79,7 @@ public class RandomLoadBalanceJavaDSLBuilderTest extends RandomLoadBalanceTest { if (nav instanceof DefaultChannel) { DefaultChannel channel = (DefaultChannel) nav; - ProcessorDefinition<?> def = channel.getProcessorDefinition(); + ProcessorDefinition<?> def = (ProcessorDefinition<?>) channel.getProcessorDefinition(); navigateDefinition(def, sb); } }