This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch 8958 in repository https://gitbox.apache.org/repos/asf/camel.git
commit a1b419bacca8fccb3f4408e32270d8830d678010 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 6 09:55:41 2018 +0100 CAMEL-8958: Claim Check EIP with push/pop. Work in progress. --- camel-core/src/main/docs/eips/claimCheck-eip.adoc | 5 +- .../apache/camel/model/ClaimCheckDefinition.java | 33 ++++++- .../apache/camel/model/ProcessorDefinition.java | 25 +++-- .../processor/ClaimCheckAggregationStrategy.java | 106 +++++++++++++++++---- .../camel/processor/ClaimCheckProcessor.java | 12 ++- .../ClaimCheckEipPushPopExcludeBodyTest.java | 60 ++++++++++++ 6 files changed, 213 insertions(+), 28 deletions(-) diff --git a/camel-core/src/main/docs/eips/claimCheck-eip.adoc b/camel-core/src/main/docs/eips/claimCheck-eip.adoc index e4dc44a..a0ff103 100644 --- a/camel-core/src/main/docs/eips/claimCheck-eip.adoc +++ b/camel-core/src/main/docs/eips/claimCheck-eip.adoc @@ -12,7 +12,7 @@ NOTE: The Camel implementation of this EIP pattern stores the message content te // eip options: START -The Claim Check EIP supports 5 options which are listed below: +The Claim Check EIP supports 6 options which are listed below: [width="100%",cols="2,5,^1,2",options="header"] @@ -20,7 +20,8 @@ The Claim Check EIP supports 5 options which are listed below: | Name | Description | Default | Type | *operation* | *Required* The claim check operation to use. The following operations is supported: Get - Gets (does not remove) the claim check by the given key. GetAndRemove - Gets and remove the claim check by the given key. Set - Sets a new (will override if key already exists) claim check with the given key. Push - Sets a new claim check on the stack (does not use key). Pop - Gets the latest claim check from the stack (does not use key). | | ClaimCheckOperation | *key* | To use a specific key for claim check id. | | String -| *include* | What data to include when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to include the message body and all headers starting with [...] +| *include* | What data to include when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to include the message body and all headers starting with [...] +| *exclude* | What data to exclude when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to exclude the message body and all headers starting with [...] | *strategyRef* | To use a custom AggregationStrategy instead of the default implementation. Notice you cannot use both custom aggregation strategy and configure data at the same time. | | String | *strategyMethodName* | This option can be used to explicit declare the method name to use when using POJOs as the AggregationStrategy. | | String |=== diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java index a78fe60..4dc1609 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java @@ -47,6 +47,8 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio private String key; @XmlAttribute private String include; + @XmlAttribute + private String exclude; @XmlAttribute(name = "strategyRef") @Metadata(label = "advanced") private String aggregationStrategyRef; @XmlAttribute(name = "strategyMethodName") @Metadata(label = "advanced") @@ -79,6 +81,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio claim.setOperation(operation.name()); claim.setKey(getKey()); claim.setInclude(getInclude()); + claim.setExclude(getExclude()); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy != null) { @@ -86,7 +89,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio } // only data or aggregation strategy can be configured not both - if (getInclude() != null && strategy != null) { + if ((getInclude() != null || getExclude() != null) && strategy != null) { throw new IllegalArgumentException("Cannot use both include/exclude and custom aggregation strategy on ClaimCheck EIP"); } @@ -153,6 +156,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio * You can specify multiple rules separated by comma. For example to include the message body and all headers starting with foo * <tt>body,header:foo*</tt>. * If the include rule is specified as empty or as wildcard then everything is included. + * If you have configured both include and exclude then exclude take precedence over include. */ public ClaimCheckDefinition include(String include) { setInclude(include); @@ -160,6 +164,25 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio } /** + * What data to exclude when merging data back from claim check repository. + * + * The following syntax is supported: + * <ul> + * <li>body</li> - to aggregate the message body + * <li>headers</li> - to aggregate all the message headers + * <li>header:pattern</li> - to aggregate all the message headers that matches the pattern. + * The pattern syntax is documented by: {@link EndpointHelper#matchPattern(String, String)}. + * </ul> + * You can specify multiple rules separated by comma. For example to exclude the message body and all headers starting with bar + * <tt>body,header:bar*</tt>. + * If you have configured both include and exclude then exclude take precedence over include. + */ + public ClaimCheckDefinition exclude(String exclude) { + setExclude(exclude); + return this; + } + + /** * To use a custom {@link AggregationStrategy} instead of the default implementation. * Notice you cannot use both custom aggregation strategy and configure data at the same time. */ @@ -212,6 +235,14 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio this.include = include; } + public String getExclude() { + return exclude; + } + + public void setExclude(String exclude) { + this.exclude = exclude; + } + public String getAggregationStrategyRef() { return aggregationStrategyRef; } diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index ac0e635..39a0c6d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -3478,11 +3478,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param key the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations */ public Type claimCheck(ClaimCheckOperation operation, String key) { - ClaimCheckDefinition answer = new ClaimCheckDefinition(); - answer.setOperation(operation); - answer.setKey(key); - addOutput(answer); - return (Type) this; + return claimCheck(operation, key, null, null); } /** @@ -3492,13 +3488,30 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * * @param operation the claim check operation to use. * @param key the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations - * @param include describes what data to include and retrieve and merge back when using get or pop operations. + * @param include describes what data to include when merging data back when using get or pop operations. */ public Type claimCheck(ClaimCheckOperation operation, String key, String include) { + return claimCheck(operation, key, include, null); + } + + /** + * The <a href="http://camel.apache.org/claim-check.html">Claim Check EIP</a> + * allows you to replace message content with a claim check (a unique key), + * which can be used to retrieve the message content at a later time. + * + * @param operation the claim check operation to use. + * @param key the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations + * @param include describes what data to include when merging data back when using get or pop operations. + * If you have configured both include and exclude then exclude take precedence over include. + * @param exclude describes what data to exclude when merging data back when using get or pop operations. + * If you have configured both include and exclude then exclude take precedence over include. + */ + public Type claimCheck(ClaimCheckOperation operation, String key, String include, String exclude) { ClaimCheckDefinition answer = new ClaimCheckDefinition(); answer.setOperation(operation); answer.setKey(key); answer.setInclude(include); + answer.setExclude(exclude); addOutput(answer); return (Type) this; } diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java index 02e0d7d..fb58346 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java @@ -23,6 +23,8 @@ import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Default {@link AggregationStrategy} used by the {@link ClaimCheckProcessor} EIP. @@ -37,10 +39,13 @@ import org.apache.camel.util.StringHelper; * You can specify multiple rules separated by comma. For example to include the message body and all headers starting with foo * <tt>body,header:foo*</tt>. * If the include rule is specified as empty or as wildcard then everything is merged. + * If you have configured both include and exclude then exclude take precedence over include. */ public class ClaimCheckAggregationStrategy implements AggregationStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ClaimCheckAggregationStrategy.class); private String include; + private String exclude; public ClaimCheckAggregationStrategy() { } @@ -53,34 +58,82 @@ public class ClaimCheckAggregationStrategy implements AggregationStrategy { this.include = include; } + public String getExclude() { + return exclude; + } + + public void setExclude(String exclude) { + this.exclude = exclude; + } + @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (newExchange == null) { return oldExchange; } - if (ObjectHelper.isEmpty(include) || "*".equals(include)) { - // grab everything if data is empty or wildcard + if (ObjectHelper.isEmpty(exclude) && (ObjectHelper.isEmpty(include) || "*".equals(include))) { + // grab everything if include is empty or wildcard (and exclude is not in use) return newExchange; } - Iterable it = ObjectHelper.createIterable(include, ","); - for (Object k : it) { - String part = k.toString(); - if ("body".equals(part)) { + // if we have include + if (ObjectHelper.isNotEmpty(include)) { + Iterable it = ObjectHelper.createIterable(include, ","); + for (Object k : it) { + String part = k.toString(); + if ("body".equals(part) && !isExcluded("body")) { + oldExchange.getMessage().setBody(newExchange.getMessage().getBody()); + LOG.trace("Including: body"); + } else if ("headers".equals(part) && !isExcluded("headers")) { + oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders()); + LOG.trace("Including: headers"); + } else if (part.startsWith("header:")) { + // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar) + String after = StringHelper.after(part, "header:"); + Iterable i = ObjectHelper.createIterable(after, ","); + for (Object o : i) { + String pattern = o.toString(); + for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) { + String key = header.getKey(); + boolean matched = EndpointHelper.matchPattern(key, pattern); + if (matched && !isExcluded(key)) { + LOG.trace("Including: header:{}", key); + oldExchange.getMessage().getHeaders().put(key, header.getValue()); + } + } + } + } + } + } else if (ObjectHelper.isNotEmpty(exclude)) { + // grab body unless its excluded + if (!isExcluded("body")) { oldExchange.getMessage().setBody(newExchange.getMessage().getBody()); - } else if ("headers".equals(part)) { - oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders()); - } else if (part.startsWith("header:")) { - // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar) - String after = StringHelper.after(part, "header:"); - Iterable i = ObjectHelper.createIterable(after, ","); - for (Object o : i) { - String pattern = o.toString(); - for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) { - String key = header.getKey(); - if (EndpointHelper.matchPattern(key, pattern)) { - oldExchange.getMessage().getHeaders().put(key, header.getValue()); + LOG.trace("Including: body"); + } + + // if not all headers is excluded, then check each header one-by-one + if (!isExcluded("headers")) { + // check if we exclude a specific headers + Iterable it = ObjectHelper.createIterable(exclude, ","); + for (Object k : it) { + String part = k.toString(); + if (part.startsWith("header:")) { + // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar) + String after = StringHelper.after(part, "header:"); + Iterable i = ObjectHelper.createIterable(after, ","); + for (Object o : i) { + String pattern = o.toString(); + for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) { + String key = header.getKey(); + boolean excluded = EndpointHelper.matchPattern(key, pattern); + if (!excluded) { + LOG.trace("Including: header:{}", key); + oldExchange.getMessage().getHeaders().put(key, header.getValue()); + } else { + LOG.trace("Excluding: header:{}", key); + } + } } } } @@ -89,4 +142,21 @@ public class ClaimCheckAggregationStrategy implements AggregationStrategy { return oldExchange; } + + private boolean isExcluded(String key) { + if (ObjectHelper.isEmpty(exclude)) { + return false; + } + String[] excludes = exclude.split(","); + for (String pattern : excludes) { + if (pattern.startsWith("header:")) { + pattern = StringHelper.after(pattern, "header:"); + } + if (EndpointHelper.matchPattern(key, pattern)) { + LOG.trace("Excluding: {}", key); + return true; + } + } + return false; + } } diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java index 137ecbf..2421878 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; * <p/> * The current Claim Check EIP implementation in Camel is only intended for temporary memory repository. Likewise * the repository is not shared among {@link Exchange}s, but a private instance is created per {@link Exchange}. - * This guards against concurrent and thread-safe issues. For off-memeory persistent storage of data, then use + * This guards against concurrent and thread-safe issues. For off-memory persistent storage of data, then use * any of the many Camel components that support persistent storage, and do not use this Claim Check EIP implementation. */ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { @@ -50,6 +50,7 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso private AggregationStrategy aggregationStrategy; private String key; private String include; + private String exclude; @Override public CamelContext getCamelContext() { @@ -103,6 +104,14 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso this.include = include; } + public String getExclude() { + return exclude; + } + + public void setExclude(String exclude) { + this.exclude = exclude; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -198,6 +207,7 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso protected AggregationStrategy createAggregationStrategy() { ClaimCheckAggregationStrategy answer = new ClaimCheckAggregationStrategy(); answer.setInclude(include); + answer.setExclude(exclude); return answer; } } diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java new file mode 100644 index 0000000..6ddbb29 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.ClaimCheckOperation; + +public class ClaimCheckEipPushPopExcludeBodyTest extends ContextTestSupport { + + public void testPushPopBodyExclude() throws Exception { + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123); + getMockEndpoint("mock:a").expectedHeaderReceived("bar", "Moes"); + getMockEndpoint("mock:b").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456); + getMockEndpoint("mock:b").expectedHeaderReceived("bar", "Jacks"); + getMockEndpoint("mock:c").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:c").expectedHeaderReceived("foo", 123); + getMockEndpoint("mock:c").expectedHeaderReceived("bar", "Jacks"); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .setHeader("bar", constant("Moes")) + .to("mock:a") + .claimCheck(ClaimCheckOperation.Push) + .transform().constant("Bye World") + .setHeader("foo", constant(456)) + .setHeader("bar", constant("Jacks")) + .to("mock:b") + // skip the foo header + .claimCheck(ClaimCheckOperation.Pop, null, null, "header:bar") + .to("mock:c"); + } + }; + } +} -- To stop receiving notification emails like this one, please contact davscl...@apache.org.