This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch when in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5ee90810238622ca781ac71afac9c4999812f8e0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jan 15 12:53:04 2025 +0100 CAMEL-21620: camel-core - Fix onWhen to not include outputs in model --- .../catalog/models/whenSkipSendToEndpoint.json | 2 +- .../apache/camel/spi/InterceptEndpointFactory.java | 4 +- .../apache/camel/spi/InterceptSendToEndpoint.java | 11 +++++ .../apache/camel/spi/InternalProcessorFactory.java | 3 +- .../engine/DefaultInterceptEndpointFactory.java | 5 ++- .../apache/camel/model/whenSkipSendToEndpoint.json | 2 +- .../apache/camel/model/InterceptDefinition.java | 31 -------------- .../model/InterceptSendToEndpointDefinition.java | 31 ++++---------- .../apache/camel/model/RouteDefinitionHelper.java | 2 - .../model/WhenSkipSendToEndpointDefinition.java | 1 + .../processor/DefaultInternalProcessorFactory.java | 5 ++- .../processor/InterceptSendToEndpointCallback.java | 9 +++-- .../InterceptSendToEndpointProcessor.java | 5 ++- .../apache/camel/reifier/InterceptFromReifier.java | 16 +++++++- .../org/apache/camel/reifier/InterceptReifier.java | 15 ++++++- .../reifier/InterceptSendToEndpointReifier.java | 47 +++++++++++++++++++++- .../reifier/WhenSkipSendToEndpointReifier.java | 1 + .../file/FileConsumerInterceptEmptyFileTest.java | 2 +- .../InterceptCustomPredicateAsFilterTest.java | 2 +- .../InterceptFromPredicateProceedAndStopTest.java | 6 +-- .../InterceptFromSimplePredicateTest.java | 2 +- .../InterceptFromSimplePredicateWithStopTest.java | 2 +- .../intercept/InterceptFromSimpleRouteTest.java | 2 +- .../intercept/InterceptFromWhenNoStopTest.java | 2 +- .../processor/intercept/InterceptFromWhenTest.java | 2 +- .../intercept/InterceptFromWhenWithChoiceTest.java | 2 +- ...erceptFromWithPredicateAndProceedRouteTest.java | 2 +- ...InterceptFromWithPredicateAndStopRouteTest.java | 2 +- .../InterceptFromWithPredicateRouteTest.java | 2 +- .../intercept/InterceptFromWithPredicateTest.java | 2 +- .../intercept/InterceptSendToEndpointTest.java | 2 +- .../InterceptSimpleRouteWhenStopTest.java | 2 +- .../intercept/InterceptSimpleRouteWhenTest.java | 2 +- .../camel/management/ManagedInterceptFromTest.java | 2 +- .../support/DefaultInterceptSendToEndpoint.java | 12 +++++- .../dsl/yaml/deserializers/ModelDeserializers.java | 2 +- .../generated/resources/schema/camelYamlDsl.json | 1 + 37 files changed, 149 insertions(+), 94 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/whenSkipSendToEndpoint.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/whenSkipSendToEndpoint.json index bb1b992d092..9e25d06a048 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/whenSkipSendToEndpoint.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/whenSkipSendToEndpoint.json @@ -4,7 +4,7 @@ "name": "whenSkipSendToEndpoint", "title": "When Skip Send To Endpoint", "description": "Predicate to determine if the message should be sent or not to the endpoint, when using interceptSentToEndpoint.", - "deprecated": false, + "deprecated": true, "label": "configuration", "javaType": "org.apache.camel.model.WhenSkipSendToEndpointDefinition", "abstract": false, diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptEndpointFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptEndpointFactory.java index 01fa2711e65..909d52fa61f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptEndpointFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptEndpointFactory.java @@ -18,6 +18,7 @@ package org.apache.camel.spi; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; import org.apache.camel.Processor; /** @@ -31,12 +32,13 @@ public interface InterceptEndpointFactory { * @param camelContext the camel context * @param endpoint the endpoint to intercept * @param skip whether to skip sending to the original endpoint + * @param onWhen optional predicate to trigger this interceptor * @param before the processor to execute before intercepting * @param after the processor to execute after intercepted * @return the endpoint with intercepting behaviour */ Endpoint createInterceptSendToEndpoint( CamelContext camelContext, Endpoint endpoint, boolean skip, - Processor before, Processor after); + Predicate onWhen, Processor before, Processor after); } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java index 7b26b9a70d2..5a4417f353f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java @@ -17,6 +17,7 @@ package org.apache.camel.spi; import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; import org.apache.camel.Processor; /** @@ -30,6 +31,16 @@ public interface InterceptSendToEndpoint extends Endpoint { */ Endpoint getOriginalEndpoint(); + /** + * Optional predicate that must match to trigger this interceptor. + */ + Predicate getOnWhen(); + + /** + * Optional predicate that must match to trigger this interceptor. + */ + void setOnWhen(Predicate onWhen); + /** * The processor for routing in a detour before sending to the original endpoint. */ diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java index 8a56f21b05a..0d2424ddd0f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.Channel; import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Route; @@ -50,7 +51,7 @@ public interface InternalProcessorFactory { Channel createChannel(CamelContext camelContext); AsyncProducer createInterceptSendToEndpointProcessor( - InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip); + InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip, Predicate onWhen); AsyncProcessor createWrapProcessor(Processor processor, Processor wrapped); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInterceptEndpointFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInterceptEndpointFactory.java index 9d9492faedf..73d1e63cdb8 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInterceptEndpointFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInterceptEndpointFactory.java @@ -18,6 +18,7 @@ package org.apache.camel.impl.engine; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.spi.InterceptEndpointFactory; import org.apache.camel.support.DefaultInterceptSendToEndpoint; @@ -29,10 +30,12 @@ public class DefaultInterceptEndpointFactory implements InterceptEndpointFactory @Override public Endpoint createInterceptSendToEndpoint( - CamelContext camelContext, Endpoint endpoint, boolean skip, Processor before, Processor after) { + CamelContext camelContext, Endpoint endpoint, boolean skip, Predicate onWhen, Processor before, Processor after) { DefaultInterceptSendToEndpoint answer = new DefaultInterceptSendToEndpoint(endpoint, skip); + answer.setOnWhen(onWhen); answer.setBefore(before); answer.setAfter(after); return answer; } + } diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/whenSkipSendToEndpoint.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/whenSkipSendToEndpoint.json index bb1b992d092..9e25d06a048 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/whenSkipSendToEndpoint.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/whenSkipSendToEndpoint.json @@ -4,7 +4,7 @@ "name": "whenSkipSendToEndpoint", "title": "When Skip Send To Endpoint", "description": "Predicate to determine if the message should be sent or not to the endpoint, when using interceptSentToEndpoint.", - "deprecated": false, + "deprecated": true, "label": "configuration", "javaType": "org.apache.camel.model.WhenSkipSendToEndpointDefinition", "abstract": false, diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptDefinition.java index b7a567f44e8..147192b53da 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptDefinition.java @@ -126,37 +126,6 @@ public class InterceptDefinition extends OutputDefinition<InterceptDefinition> { return this; } - /** - * This method is <b>only</b> for handling some post configuration that is needed since this is an interceptor, and - * we have to do a bit of magic logic to fixup to handle predicates with or without proceed/stop set as well. - */ - public void afterPropertiesSet() { - System.out.println("A"); - if (getOutputs().isEmpty()) { - // no outputs - return; - } - - // TODO: Make special reifier so we do not manipulate model here - - System.out.println("B"); - - if (onWhen != null) { - System.out.println("C"); - // change onWhen to when that also includes the outputs - // so they are only triggered if the predicate matches at runtime - WhenDefinition copy = new WhenDefinition(onWhen); - copy.setParent(this); - for (ProcessorDefinition<?> out : outputs) { - copy.addOutput(out); - } - clearOutput(); - outputs.add(copy); - System.out.println("D"); - } - System.out.println("E"); - } - public List<Processor> getIntercepted() { return intercepted; } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java index 04c315c938d..577443d7f2b 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java @@ -159,31 +159,14 @@ public class InterceptSendToEndpointDefinition extends OutputDefinition<Intercep * This method is <b>only</b> for handling some post configuration that is needed since this is an interceptor, and * we have to do a bit of magic logic to fixup to handle predicates with or without proceed/stop set as well. */ + @Deprecated public void afterPropertiesSet() { - System.out.println("A"); - if (getOutputs().isEmpty()) { - // no outputs - return; - } - - // TODO: Make special reifier so we do not manipulate model here - - System.out.println("B"); - - if (onWhen != null) { - System.out.println("C"); - // change onWhen to when that also includes the outputs - // so they are only triggered if the predicate matches at runtime - WhenDefinition copy = new WhenDefinition(onWhen); - copy.setParent(this); - for (ProcessorDefinition<?> out : outputs) { - copy.addOutput(out); - } - clearOutput(); - outputs.add(copy); - System.out.println("D"); - } - System.out.println("E"); + /** + * if (getOutputs().isEmpty()) { // no outputs return; } if (onWhen != null) { // change onWhen to when that + * also includes the outputs // so they are only triggered if the predicate matches at runtime WhenDefinition + * copy = new WhenDefinition(onWhen); copy.setParent(this); for (ProcessorDefinition<?> out : outputs) { + * copy.addOutput(out); } clearOutput(); outputs.add(copy); } + **/ } public String getSkipSendToOriginalEndpoint() { diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java index d684aae79a5..e416aa4eeed 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java @@ -578,7 +578,6 @@ public final class RouteDefinitionHelper { // configure intercept if (intercepts != null && !intercepts.isEmpty()) { for (InterceptDefinition intercept : intercepts) { - intercept.afterPropertiesSet(); // init the parent initParent(intercept); // add as first output so intercept is handled before the actual @@ -643,7 +642,6 @@ public final class RouteDefinitionHelper { } if (match) { - intercept.afterPropertiesSet(); // init the parent initParent(intercept); // add as first output so intercept is handled before the diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/WhenSkipSendToEndpointDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/WhenSkipSendToEndpointDefinition.java index bd2a310404d..a4c3362b054 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/WhenSkipSendToEndpointDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/WhenSkipSendToEndpointDefinition.java @@ -30,6 +30,7 @@ import org.apache.camel.spi.Metadata; @AsPredicate @XmlRootElement(name = "whenSkipSendToEndpoint") @XmlTransient // do not output in XSD as this is only used in a special "hack" for intercept +@Deprecated public class WhenSkipSendToEndpointDefinition extends OnWhenDefinition { public WhenSkipSendToEndpointDefinition() { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java index 45920bed5c1..74118ab510a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.Channel; import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Route; @@ -60,8 +61,8 @@ public class DefaultInternalProcessorFactory implements InternalProcessorFactory } public AsyncProducer createInterceptSendToEndpointProcessor( - InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) { - return new InterceptSendToEndpointProcessor(endpoint, delegate, producer, skip); + InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip, Predicate onWhen) { + return new InterceptSendToEndpointProcessor(endpoint, delegate, producer, skip, onWhen); } public AsyncProcessor createWrapProcessor(Processor processor, Processor wrapped) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointCallback.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointCallback.java index a6bca20f705..536c8e39074 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointCallback.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointCallback.java @@ -18,6 +18,7 @@ package org.apache.camel.processor; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.InterceptSendToEndpoint; @@ -31,18 +32,20 @@ import org.apache.camel.util.URISupport; public class InterceptSendToEndpointCallback implements EndpointStrategy { private final CamelContext camelContext; + private final Predicate onWhen; private final Processor before; private final Processor after; private final String matchURI; private final boolean skip; - public InterceptSendToEndpointCallback(CamelContext camelContext, Processor before, Processor after, String matchURI, - boolean skip) { + public InterceptSendToEndpointCallback(CamelContext camelContext, Processor before, Processor after, + String matchURI, boolean skip, Predicate onWhen) { this.camelContext = camelContext; this.before = before; this.after = after; this.matchURI = matchURI; this.skip = skip; + this.onWhen = onWhen; } public Endpoint registerEndpoint(String uri, Endpoint endpoint) { @@ -53,7 +56,7 @@ public class InterceptSendToEndpointCallback implements EndpointStrategy { // only proxy if the uri is matched decorate endpoint with // our proxy should be false by default return PluginHelper.getInterceptEndpointFactory(camelContext) - .createInterceptSendToEndpoint(camelContext, endpoint, skip, before, after); + .createInterceptSendToEndpoint(camelContext, endpoint, skip, onWhen, before, after); } else { // no proxy so return regular endpoint return endpoint; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java index 1c2beca9122..ee03d5ef80f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java @@ -50,15 +50,17 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { private final Endpoint delegate; private final AsyncProducer producer; private final boolean skip; + private final Predicate onWhen; private AsyncProcessor pipeline; public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, - boolean skip) { + boolean skip, Predicate onWhen) { super(delegate); this.endpoint = endpoint; this.delegate = delegate; this.producer = producer; this.skip = skip; + this.onWhen = onWhen != null ? onWhen : p -> true; } @Override @@ -156,6 +158,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { private FilterProcessor createFilterProcessor() { Predicate predicate = exchange -> { + onWhen.matches(exchange); Boolean whenMatches = (Boolean) exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); return whenMatches == null || whenMatches; diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptFromReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptFromReifier.java index 4f9cc2cbc04..db979549fda 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptFromReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptFromReifier.java @@ -19,10 +19,12 @@ package org.apache.camel.reifier; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.InterceptFromDefinition; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.FilterProcessor; import org.apache.camel.support.processor.DelegateAsyncProcessor; public class InterceptFromReifier extends InterceptReifier<InterceptFromDefinition> { @@ -33,9 +35,10 @@ public class InterceptFromReifier extends InterceptReifier<InterceptFromDefiniti @Override public Processor createProcessor() throws Exception { - final Processor child = this.createChildProcessor(true); + Processor child = this.createChildProcessor(true); - return new DelegateAsyncProcessor(child) { + // TODO: Pipeline with set property and true filter if onWhen is empty + child = new DelegateAsyncProcessor(child) { @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.getFromEndpoint() != null) { @@ -44,6 +47,15 @@ public class InterceptFromReifier extends InterceptReifier<InterceptFromDefiniti return super.process(exchange, callback); } }; + + Predicate when = null; + if (definition.getOnWhen() != null) { + when = createPredicate(definition.getOnWhen().getExpression()); + } + if (when != null) { + child = new FilterProcessor(getCamelContext(), when, child); + } + return child; } } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java index 1d48bcf0bd4..b8444e5b4da 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java @@ -18,11 +18,13 @@ package org.apache.camel.reifier; import org.apache.camel.CamelContext; import org.apache.camel.NamedNode; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.InterceptDefinition; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.processor.FilterProcessor; import org.apache.camel.processor.Pipeline; import org.apache.camel.spi.InterceptStrategy; @@ -35,7 +37,16 @@ public class InterceptReifier<T extends InterceptDefinition> extends ProcessorRe @Override public Processor createProcessor() throws Exception { // create the output processor - Processor output = this.createChildProcessor(true); + Processor child = this.createChildProcessor(true); + + Predicate when = null; + if (definition.getOnWhen() != null) { + when = createPredicate(definition.getOnWhen().getExpression()); + } + if (when != null) { + child = new FilterProcessor(getCamelContext(), when, child); + } + final Processor output = child; // add the output as an intercept strategy to the route context so its // invoked on each processing step @@ -49,7 +60,7 @@ public class InterceptReifier<T extends InterceptDefinition> extends ProcessorRe // store the target we are intercepting this.interceptedTarget = target; - // remember the target that was intercepted + // remember the target that was intercepted // TODO: Remove me InterceptReifier.this.definition.getIntercepted().add(interceptedTarget); if (interceptedTarget != null) { diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java index fc4f53f659d..91e5dba9905 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java @@ -18,6 +18,10 @@ package org.apache.camel.reifier; import java.util.List; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.InterceptSendToEndpointDefinition; @@ -54,9 +58,15 @@ public class InterceptSendToEndpointReifier extends ProcessorReifier<InterceptSe final String matchURI = parseString(definition.getUri()); final boolean skip = parseBoolean(definition.getSkipSendToOriginalEndpoint(), false); + Predicate when = null; + if (definition.getOnWhen() != null) { + when = new OnWhenPredicate(createPredicate(definition.getOnWhen().getExpression())); + } + // register endpoint callback so we can proxy the endpoint camelContext.getCamelContextExtension() - .registerEndpointCallback(new InterceptSendToEndpointCallback(camelContext, before, after, matchURI, skip)); + .registerEndpointCallback( + new InterceptSendToEndpointCallback(camelContext, before, after, matchURI, skip, when)); // remove the original intercepted route from the outputs as we do not // intercept as the regular interceptor @@ -70,4 +80,39 @@ public class InterceptSendToEndpointReifier extends ProcessorReifier<InterceptSe return new InterceptEndpointProcessor(matchURI, before); } + /** + * Wrap in predicate to set filter marker we need to keep track whether the when matches or not, so delegate the + * predicate and add the matches result as a property on the exchange + */ + private static class OnWhenPredicate implements Predicate { + + private final Predicate delegate; + + public OnWhenPredicate(Predicate delegate) { + this.delegate = delegate; + } + + @Override + public boolean matches(Exchange exchange) { + boolean matches = delegate.matches(exchange); + exchange.setProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED, matches); + return matches; + } + + @Override + public void init(CamelContext context) { + delegate.init(context); + } + + @Override + public void initPredicate(CamelContext context) { + delegate.initPredicate(context); + } + + @Override + public String toString() { + return delegate.toString(); + } + } + } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WhenSkipSendToEndpointReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WhenSkipSendToEndpointReifier.java index 081d7619e17..4821a3d3988 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WhenSkipSendToEndpointReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WhenSkipSendToEndpointReifier.java @@ -24,6 +24,7 @@ import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.WhenSkipSendToEndpointDefinition; import org.apache.camel.processor.FilterProcessor; +@Deprecated public class WhenSkipSendToEndpointReifier extends ExpressionReifier<WhenSkipSendToEndpointDefinition> { public WhenSkipSendToEndpointReifier(Route route, ProcessorDefinition<?> definition) { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerInterceptEmptyFileTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerInterceptEmptyFileTest.java index 1cb7c45aa6f..08a6ed14bad 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerInterceptEmptyFileTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerInterceptEmptyFileTest.java @@ -51,7 +51,7 @@ public class FileConsumerInterceptEmptyFileTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - interceptFrom().when(simple("${file:length} == 0")).to("mock:skip").stop(); + interceptFrom().onWhen(simple("${file:length} == 0")).to("mock:skip").stop(); from(fileUri("?initialDelay=10&delay=10")) .convertBodyTo(String.class).to("log:test") diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/InterceptCustomPredicateAsFilterTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/InterceptCustomPredicateAsFilterTest.java index bc18d874bb7..e773d1998db 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/InterceptCustomPredicateAsFilterTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/InterceptCustomPredicateAsFilterTest.java @@ -71,7 +71,7 @@ public class InterceptCustomPredicateAsFilterTest extends ContextTestSupport { @Override public void configure() { // secret messages should be filtered out asap - intercept().when(not(filter)).to("mock:secret").stop(); + intercept().onWhen(not(filter)).to("mock:secret").stop(); from("direct:start").to("mock:good"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/InterceptFromPredicateProceedAndStopTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/InterceptFromPredicateProceedAndStopTest.java index bd37ebb4d5e..b3639a26faa 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/InterceptFromPredicateProceedAndStopTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/InterceptFromPredicateProceedAndStopTest.java @@ -93,7 +93,7 @@ public class InterceptFromPredicateProceedAndStopTest extends ContextTestSupport public void testInterceptorWithPredicate() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() { - interceptFrom().when(header("user").isEqualTo("test")).to("mock:test"); + interceptFrom().onWhen(header("user").isEqualTo("test")).to("mock:test"); from("seda:order").to("mock:ok"); } }); @@ -114,7 +114,7 @@ public class InterceptFromPredicateProceedAndStopTest extends ContextTestSupport public void testInterceptorWithPredicateAndProceed() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() { - interceptFrom().when(header("user").isEqualTo("test")).to("mock:test"); + interceptFrom().onWhen(header("user").isEqualTo("test")).to("mock:test"); from("seda:order").to("mock:ok"); } }); @@ -135,7 +135,7 @@ public class InterceptFromPredicateProceedAndStopTest extends ContextTestSupport public void testInterceptorWithPredicateAndStop() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() { - interceptFrom().when(header("user").isEqualTo("test")).to("mock:test").stop(); + interceptFrom().onWhen(header("user").isEqualTo("test")).to("mock:test").stop(); from("seda:order").to("mock:ok"); } }); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateTest.java index 30a742a5cc5..88287dd3719 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateTest.java @@ -52,7 +52,7 @@ public class InterceptFromSimplePredicateTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { // START SNIPPET: e1 - interceptFrom().when(header("usertype").isEqualTo("test")).process(new MyTestServiceProcessor()) + interceptFrom().onWhen(header("usertype").isEqualTo("test")).process(new MyTestServiceProcessor()) .to("mock:intercepted"); // and here is our route diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateWithStopTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateWithStopTest.java index 729373c0e02..dba1a0ead77 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateWithStopTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimplePredicateWithStopTest.java @@ -40,7 +40,7 @@ public class InterceptFromSimplePredicateWithStopTest extends ContextTestSupport return new RouteBuilder() { public void configure() { // START SNIPPET: e1 - interceptFrom().when(header("usertype").isEqualTo("test")) + interceptFrom().onWhen(header("usertype").isEqualTo("test")) // here we use stop() to tell Camel to NOT continue routing // the message. // this let us act as a filter, to drop certain messages. diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimpleRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimpleRouteTest.java index d21c7d024a6..223cd0817ca 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimpleRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromSimpleRouteTest.java @@ -47,7 +47,7 @@ public class InterceptFromSimpleRouteTest extends ContextTestSupport { public void configure() { // In Camel 1.4 proceed is default so we must use stop to not // route it to the result mock - interceptFrom().when(header("city").isEqualTo("London")).to("mock:intercepted").stop(); + interceptFrom().onWhen(header("city").isEqualTo("London")).to("mock:intercepted").stop(); from("seda:a").to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenNoStopTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenNoStopTest.java index f5516ea3040..465b548f630 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenNoStopTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenNoStopTest.java @@ -47,7 +47,7 @@ public class InterceptFromWhenNoStopTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() { - interceptFrom().when(simple("${body} contains 'Goofy'")).to("mock:goofy"); + interceptFrom().onWhen(simple("${body} contains 'Goofy'")).to("mock:goofy"); from("direct:start").to("mock:end"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenTest.java index 850506e957d..4d4ff10347c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenTest.java @@ -49,7 +49,7 @@ public class InterceptFromWhenTest extends ContextTestSupport { public void configure() { context.setTracing(true); - interceptFrom().when(simple("${body} contains 'Goofy'")).to("mock:goofy").stop(); + interceptFrom().onWhen(simple("${body} contains 'Goofy'")).to("mock:goofy").stop(); from("direct:start").to("mock:end"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenWithChoiceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenWithChoiceTest.java index 913f070e5c0..752b4179139 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenWithChoiceTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWhenWithChoiceTest.java @@ -63,7 +63,7 @@ public class InterceptFromWhenWithChoiceTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() { - interceptFrom().when(simple("${body} contains 'Goofy'")).choice().when(body().contains("Hello")) + interceptFrom().onWhen(simple("${body} contains 'Goofy'")).choice().when(body().contains("Hello")) .to("mock:hello").otherwise().to("log:foo").to("mock:goofy").end() .stop(); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndProceedRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndProceedRouteTest.java index 9d441c558af..bc8ddf91b51 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndProceedRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndProceedRouteTest.java @@ -23,7 +23,7 @@ public class InterceptFromWithPredicateAndProceedRouteTest extends InterceptFrom protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - interceptFrom().when(header("foo").isEqualTo("bar")).to("mock:b"); + interceptFrom().onWhen(header("foo").isEqualTo("bar")).to("mock:b"); from("direct:start").to("mock:a"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndStopRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndStopRouteTest.java index 4f50d519e17..ae12e242771 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndStopRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateAndStopRouteTest.java @@ -24,7 +24,7 @@ public class InterceptFromWithPredicateAndStopRouteTest extends InterceptFromRou protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - interceptFrom().when(header("foo").isEqualTo("bar")).to("mock:b").stop(); + interceptFrom().onWhen(header("foo").isEqualTo("bar")).to("mock:b").stop(); from("direct:start").to("mock:a"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateRouteTest.java index cd4f83f65ae..a8c7550e24c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateRouteTest.java @@ -24,7 +24,7 @@ public class InterceptFromWithPredicateRouteTest extends InterceptFromRouteTestS return new RouteBuilder() { public void configure() { // no stop so the message will proceed in its normal route also - interceptFrom().when(header("foo").isEqualTo("bar")).to("mock:b"); + interceptFrom().onWhen(header("foo").isEqualTo("bar")).to("mock:b"); from("direct:start").to("mock:a"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateTest.java index c863aa6e129..06577e096c1 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptFromWithPredicateTest.java @@ -25,7 +25,7 @@ public class InterceptFromWithPredicateTest extends InterceptFromRouteTestSuppor return new RouteBuilder() { public void configure() { // intercept with a predicate test - interceptFrom().when(header("foo").isEqualTo("bar")).to("mock:b").stop(); + interceptFrom().onWhen(header("foo").isEqualTo("bar")).to("mock:b").stop(); from("direct:start").to("mock:a"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointTest.java index 207557f1903..0b9b866533e 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointTest.java @@ -73,7 +73,7 @@ public class InterceptSendToEndpointTest extends ContextTestSupport { // we can also attach a predicate to the endpoint interceptor. // So in this example the exchange is // only intercepted if the body is Hello World - interceptSendToEndpoint("mock:foo").when(body().isEqualTo("Hello World")).to("mock:detour") + interceptSendToEndpoint("mock:foo").onWhen(body().isEqualTo("Hello World")).to("mock:detour") .transform(constant("Bye World")); from("direct:second").to("mock:bar").to("mock:foo").to("mock:result"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenStopTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenStopTest.java index ac190c999e8..5f006401376 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenStopTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenStopTest.java @@ -54,7 +54,7 @@ public class InterceptSimpleRouteWhenStopTest extends ContextTestSupport { @Override public void configure() { // START SNIPPET: e1 - intercept().when(body().contains("Hello")).to("mock:intercepted").stop(); + intercept().onWhen(body().contains("Hello")).to("mock:intercepted").stop(); from("direct:start").to("mock:foo", "mock:bar", "mock:result"); // END SNIPPET: e1 diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenTest.java index 44607cc466f..a24627d8e81 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSimpleRouteWhenTest.java @@ -42,7 +42,7 @@ public class InterceptSimpleRouteWhenTest extends ContextTestSupport { @Override public void configure() { // START SNIPPET: e1 - intercept().when(body().contains("Hello")).to("mock:intercepted"); + intercept().onWhen(body().contains("Hello")).to("mock:intercepted"); from("direct:start").to("mock:foo", "mock:bar", "mock:result"); // END SNIPPET: e1 diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInterceptFromTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInterceptFromTest.java index 589f5af7d49..11b643e2c70 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInterceptFromTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInterceptFromTest.java @@ -42,7 +42,7 @@ public class ManagedInterceptFromTest extends ManagementTestSupport { return new RouteBuilder() { @Override public void configure() { - interceptFrom().when(simple("${header.foo} == '123'")).to("mock:intercepted"); + interceptFrom().onWhen(simple("${header.foo} == '123'")).to("mock:intercepted"); from("direct:start").to("mock:foo"); } }; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java index ba811529088..e59f7b19d65 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.PollingConsumer; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ShutdownableService; @@ -39,6 +40,7 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, private final CamelContext camelContext; private final Endpoint delegate; + private Predicate onWhen; private Processor before; private Processor after; private boolean skip; @@ -55,6 +57,14 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, this.skip = skip; } + public Predicate getOnWhen() { + return onWhen; + } + + public void setOnWhen(Predicate onWhen) { + this.onWhen = onWhen; + } + public void setBefore(Processor before) { this.before = before; } @@ -146,7 +156,7 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, public AsyncProducer createAsyncProducer() throws Exception { AsyncProducer producer = delegate.createAsyncProducer(); return PluginHelper.getInternalProcessorFactory(camelContext) - .createInterceptSendToEndpointProcessor(this, delegate, producer, skip); + .createInterceptSendToEndpointProcessor(this, delegate, producer, skip, onWhen); } @Override diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java index 400ba3b31b6..66e49a144bb 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java @@ -21233,7 +21233,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport { order = org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1, displayName = "When Skip Send To Endpoint", description = "Predicate to determine if the message should be sent or not to the endpoint, when using interceptSentToEndpoint.", - deprecated = false, + deprecated = true, properties = { @YamlProperty(name = "__extends", type = "object:org.apache.camel.model.language.ExpressionDefinition", oneOf = "expression"), @YamlProperty(name = "description", type = "string", description = "Sets the description of this node", displayName = "Description"), diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json index df071c6faf0..2f1922c576e 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json @@ -8540,6 +8540,7 @@ "org.apache.camel.model.WhenSkipSendToEndpointDefinition" : { "title" : "When Skip Send To Endpoint", "description" : "Predicate to determine if the message should be sent or not to the endpoint, when using interceptSentToEndpoint.", + "deprecated" : true, "type" : "object", "additionalProperties" : false, "anyOf" : [ {