Repository: camel Updated Branches: refs/heads/master 5e5a4d6db -> 111c01ad7
CAMEL-7609: Added shareUnitOfWork option to enrich. Thanks to metatech for the patch which I adjusted. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/111c01ad Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/111c01ad Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/111c01ad Branch: refs/heads/master Commit: 111c01ad76d9930f0540f83ff5d3adb8c0279788 Parents: 5e5a4d6 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 12 14:49:13 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 12 15:57:26 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/model/EnrichDefinition.java | 21 +++- .../apache/camel/model/ProcessorDefinition.java | 49 +++++++++ .../org/apache/camel/processor/Enricher.java | 23 +++- .../processor/EnrichSubUnitOfWorkTest.java | 108 +++++++++++++++++++ 4 files changed, 196 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java index 5a3662f..6f0e358 100644 --- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java @@ -57,7 +57,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple private Boolean aggregateOnException; @XmlTransient private AggregationStrategy aggregationStrategy; - + @XmlAttribute + private Boolean shareUnitOfWork; + public EnrichDefinition() { this(null, null); } @@ -107,8 +109,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple } else { endpoint = routeContext.resolveEndpoint(null, resourceRef); } + boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); - Enricher enricher = new Enricher(null, endpoint.createProducer()); + Enricher enricher = new Enricher(null, endpoint.createProducer(), isShareUnitOfWork); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy == null) { enricher.setDefaultAggregationStrategy(); @@ -232,4 +235,18 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple public void setAggregateOnException(Boolean aggregateOnException) { this.aggregateOnException = aggregateOnException; } + + public Boolean getShareUnitOfWork() { + return shareUnitOfWork; + } + + /** + * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange. + * Enrich will by default not share unit of work between the parent exchange and the resource exchange. + * This means the resource exchange has its own individual unit of work. + */ + public void setShareUnitOfWork(Boolean shareUnitOfWork) { + this.shareUnitOfWork = shareUnitOfWork; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 8aec8b9..c84ea47 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -3153,6 +3153,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * * @param resourceUri URI of resource endpoint for obtaining additional data. * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. * @return the builder * @see org.apache.camel.processor.Enricher */ @@ -3167,6 +3169,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @param shareUnitOfWork whether to share unit of work + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + @SuppressWarnings("unchecked") + public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) { + EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri); + enrich.setAggregateOnException(aggregateOnException); + enrich.setShareUnitOfWork(shareUnitOfWork); + addOutput(enrich); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * <p/> * The difference between this and {@link #pollEnrich(String)} is that this uses a producer * to obatin the additional data, where as pollEnrich uses a polling consumer. @@ -3228,6 +3251,32 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * <p/> + * The difference between this and {@link #pollEnrich(String)} is that this uses a producer + * to obtain the additional data, where as pollEnrich uses a polling consumer. + * + * @param resourceRef Reference of resource endpoint for obtaining additional data. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @param shareUnitOfWork whether to share unit of work + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + @SuppressWarnings("unchecked") + public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) { + EnrichDefinition enrich = new EnrichDefinition(); + enrich.setResourceRef(resourceRef); + enrich.setAggregationStrategyRef(aggregationStrategyRef); + enrich.setAggregateOnException(aggregateOnException); + enrich.setShareUnitOfWork(shareUnitOfWork); + addOutput(enrich); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code> * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. * <p/> http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index 75d2429..b0532ba 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -23,10 +23,13 @@ import org.apache.camel.Endpoint; import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -57,6 +60,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint private AggregationStrategy aggregationStrategy; private Producer producer; private boolean aggregateOnException; + private boolean shareUnitOfWork; /** * Creates a new {@link Enricher}. The default aggregation strategy is to @@ -67,7 +71,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint * @param producer producer to resource endpoint. */ public Enricher(Producer producer) { - this(defaultAggregationStrategy(), producer); + this(defaultAggregationStrategy(), producer, false); } /** @@ -75,10 +79,12 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint * * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. * @param producer producer to resource endpoint. + * @param shareUnitOfWork whether to share unit of work */ - public Enricher(AggregationStrategy aggregationStrategy, Producer producer) { + public Enricher(AggregationStrategy aggregationStrategy, Producer producer, boolean shareUnitOfWork) { this.aggregationStrategy = aggregationStrategy; this.producer = producer; + this.shareUnitOfWork = shareUnitOfWork; } public String getId() { @@ -144,7 +150,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint public boolean process(final Exchange exchange, final AsyncCallback callback) { final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); final Endpoint destination = producer.getEndpoint(); - + EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination); // record timing for sending the exchange using the producer final StopWatch watch = new StopWatch(); @@ -247,6 +253,13 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint // copy exchange, and do not share the unit of work Exchange target = ExchangeHelper.createCorrelatedCopy(source, false); target.setPattern(pattern); + + // if we share unit of work, we need to prepare the resource exchange + if (isShareUnitOfWork()) { + target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork()); + // and then share the unit of work + target.setUnitOfWork(source.getUnitOfWork()); + } return target; } @@ -260,6 +273,10 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint return new CopyAggregationStrategy(); } + public boolean isShareUnitOfWork() { + return shareUnitOfWork; + } + @Override public String toString() { return "Enrich[" + producer.getEndpoint() + "]"; http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java new file mode 100644 index 0000000..0db19e6 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class EnrichSubUnitOfWorkTest extends ContextTestSupport { + + private static int counter; + + public void testOK() throws Exception { + counter = 0; + + getMockEndpoint("mock:dead").expectedMessageCount(0); + getMockEndpoint("mock:start").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testError() throws Exception { + counter = 0; + + // the DLC should receive the original message which is Bye World + getMockEndpoint("mock:dead").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:start").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:b").expectedMessageCount(0); + getMockEndpoint("mock:result").expectedMessageCount(0); + + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(); + + assertEquals(4, counter); // 1 first + 3 redeliveries + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead").useOriginalMessage() + .maximumRedeliveries(3).redeliveryDelay(0)); + + from("direct:start") + .to("mock:start") + .process(new MyPreProcessor()) + .enrich("direct:b", null, false, true) + .to("mock:result"); + + from("direct:b") + .process(new MyProcessor()) + .to("mock:b"); + } + }; + } + + public static class MyPreProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + // if its a bye message then alter it to something with + // Donkey to cause a failure in the sub unit of work + // but the DLC should still receive the original input + String body = exchange.getIn().getBody(String.class); + if (body.startsWith("Bye")) { + exchange.getIn().setBody("Donkey was here"); + } + } + } + + public static class MyProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + if (body.contains("Donkey")) { + counter++; + throw new IllegalArgumentException("Donkey not allowed"); + } + } + } + + +}