CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/baece126 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/baece126 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/baece126 Branch: refs/heads/master Commit: baece126edb7dd9ca9507534c522e9996e724d87 Parents: fffafeb Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Feb 12 17:09:29 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Feb 12 17:54:51 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/model/MulticastDefinition.java | 20 ++-- .../apache/camel/model/ProcessorDefinition.java | 11 +- .../camel/model/RecipientListDefinition.java | 9 +- .../org/apache/camel/model/SplitDefinition.java | 12 +++ .../apache/camel/processor/RecipientList.java | 10 +- .../org/apache/camel/processor/Splitter.java | 6 +- .../ShareUnitOfWorkAggregationStrategy.java | 77 ++++++++++++++ ...tOfWorkOnExceptionHandledFalseIssueTest.java | 2 +- .../MulticastCopyOfSplitSubUnitOfWorkTest.java | 102 +++++++++++++++++++ .../camel/processor/SplitSubUnitOfWorkTest.java | 1 + 10 files changed, 222 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index 55f6ad0..42b3e59 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -31,6 +31,7 @@ import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; +import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; @@ -287,11 +288,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { - AggregationStrategy strategy = createAggregationStrategy(routeContext); - if (strategy == null) { - // default to use latest aggregation strategy - strategy = new UseLatestAggregationStrategy(); - } + final AggregationStrategy strategy = createAggregationStrategy(routeContext); boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); @@ -333,14 +330,23 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } } - if (strategy != null && strategy instanceof CamelContextAware) { + if (strategy == null) { + // default to use latest aggregation strategy + strategy = new UseLatestAggregationStrategy(); + } + + if (strategy instanceof CamelContextAware) { ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); } + if (shareUnitOfWork != null && shareUnitOfWork) { + // wrap strategy in share unit of work + strategy = new ShareUnitOfWorkAggregationStrategy(strategy); + } + return strategy; } - public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 0705d69..eacb304 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -54,7 +54,6 @@ import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.model.language.LanguageExpression; import org.apache.camel.model.language.SimpleExpression; import org.apache.camel.model.rest.RestDefinition; -import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.InterceptEndpointProcessor; import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -535,16 +534,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> processor = createProcessor(routeContext); } - // unwrap internal processor so we can set id on the actual processor - Processor idProcessor = processor; - if (processor instanceof CamelInternalProcessor) { - idProcessor = ((CamelInternalProcessor) processor).getProcessor(); - } - // inject id - if (idProcessor instanceof IdAware) { + if (processor instanceof IdAware) { String id = this.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); - ((IdAware) idProcessor).setId(id); + ((IdAware) processor).setId(id); } if (processor == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index 49d75f9..0d02a48 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -34,6 +34,7 @@ import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; +import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; @@ -192,8 +193,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); } } + if (strategy == null) { - // fallback to use latest + // default to use latest aggregation strategy strategy = new UseLatestAggregationStrategy(); } @@ -201,6 +203,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); } + if (shareUnitOfWork != null && shareUnitOfWork) { + // wrap strategy in share unit of work + strategy = new ShareUnitOfWorkAggregationStrategy(strategy); + } + return strategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java index ccfd045..5e49de2 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -31,6 +31,7 @@ import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.Splitter; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; +import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.CamelContextHelper; @@ -119,6 +120,12 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(), timeout, onPrepare, isShareUnitOfWork, isParallelAggregate); +// if (isShareUnitOfWork) { + // wrap answer in a sub unit of work, since we share the unit of work +// CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer); +// internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice()); +// return internalProcessor; +// } return answer; } @@ -144,6 +151,11 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); } + if (strategy != null && shareUnitOfWork != null && shareUnitOfWork) { + // wrap strategy in share unit of work + strategy = new ShareUnitOfWorkAggregationStrategy(strategy); + } + return strategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index 98f8e45..ded8ca9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -166,16 +166,8 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA return true; } - AsyncProcessor target = rlp; - if (isShareUnitOfWork()) { - // wrap answer in a sub unit of work, since we share the unit of work - CamelInternalProcessor internalProcessor = new CamelInternalProcessor(rlp); - internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice()); - target = internalProcessor; - } - // now let the multicast process the exchange - return target.process(exchange, callback); + return rlp.process(exchange, callback); } protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/Splitter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index 55a9bd9..40ca426 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -36,6 +36,7 @@ import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.Traceable; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ExchangeHelper; @@ -97,7 +98,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac // and propagate exceptions which is done by a per exchange specific aggregation strategy // to ensure it supports async routing if (strategy == null) { - UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); + AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); + if (isShareUnitOfWork()) { + original = new ShareUnitOfWorkAggregationStrategy(original); + } setAggregationStrategyOnExchange(exchange, original); } http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java new file mode 100644 index 0000000..4a1187f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import org.apache.camel.Exchange; + +import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErrorHandler; + +/** + * An {@link AggregationStrategy} which are used when the option <tt>shareUnitOfWork</tt> is enabled + * on EIPs such as multicast, splitter or recipientList. + * <p/> + * This strategy wraps the actual in use strategy to provide the logic needed for making shareUnitOfWork work. + * <p/> + * This strategy is <b>not</b> intended for end users to use. + */ +public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy { + + private final AggregationStrategy strategy; + + public ShareUnitOfWorkAggregationStrategy(AggregationStrategy strategy) { + this.strategy = strategy; + } + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // aggreagate using the actual strategy first + Exchange answer = strategy.aggregate(oldExchange, newExchange); + // ensure any errors is propagated from the new exchange to the answer + propagateFailure(answer, newExchange); + + return answer; + } + + protected void propagateFailure(Exchange answer, Exchange newExchange) { + // if new exchange failed then propagate all the error related properties to the answer + boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange); + if (newExchange.isFailed() || newExchange.isRollbackOnly() || exceptionHandled) { + if (newExchange.getException() != null) { + answer.setException(newExchange.getException()); + } + if (newExchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { + answer.setProperty(Exchange.EXCEPTION_CAUGHT, newExchange.getProperty(Exchange.EXCEPTION_CAUGHT)); + } + if (newExchange.getProperty(Exchange.FAILURE_ENDPOINT) != null) { + answer.setProperty(Exchange.FAILURE_ENDPOINT, newExchange.getProperty(Exchange.FAILURE_ENDPOINT)); + } + if (newExchange.getProperty(Exchange.FAILURE_ROUTE_ID) != null) { + answer.setProperty(Exchange.FAILURE_ROUTE_ID, newExchange.getProperty(Exchange.FAILURE_ROUTE_ID)); + } + if (newExchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null) { + answer.setProperty(Exchange.ERRORHANDLER_HANDLED, newExchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); + } + if (newExchange.getProperty(Exchange.FAILURE_HANDLED) != null) { + answer.setProperty(Exchange.FAILURE_HANDLED, newExchange.getProperty(Exchange.FAILURE_HANDLED)); + } + } + } + + @Override + public String toString() { + return "ShareUnitOfWorkAggregationStrategy"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java index ef33c9a..55fe155 100644 --- a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java +++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java @@ -31,7 +31,7 @@ public class RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest extend template.sendBodyAndHeader("direct:start", "Hello World", "foo", "direct:b,direct:c"); fail("Should throw exception"); } catch (Exception e) { - IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause()); assertEquals("Forced", cause.getMessage()); } http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java new file mode 100644 index 0000000..ebf4daf --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class MulticastCopyOfSplitSubUnitOfWorkTest extends ContextTestSupport { + + private static int counter; + + public void testOK() throws Exception { + counter = 0; + + getMockEndpoint("mock:dead").expectedMessageCount(0); + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(1); + getMockEndpoint("mock:line").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testError() throws Exception { + counter = 0; + + getMockEndpoint("mock:dead").expectedMessageCount(1); + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:line").expectedMessageCount(0); + + template.sendBody("direct:start", "Hello Donkey"); + + assertMockEndpointsSatisfied(); + + assertEquals(4, counter); // 1 first + 3 redeliveries + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + errorHandler(deadLetterChannel("mock:dead").useOriginalMessage() + .maximumRedeliveries(3).redeliveryDelay(0)); + + from("direct:start") + .to("mock:a") + // share unit of work in the multicast, which tells Camel to propagate failures from + // processing the multicast messages back to the result of the splitter, which allows + // it to act as a combined unit of work + .multicast().shareUnitOfWork() + .to("mock:b") + .to("direct:line") + .end() + .to("mock:result"); + + from("direct:line") + .to("log:line") + .process(new MyProcessor()) + .to("mock:line"); + // END SNIPPET: e1 + } + }; + } + + public static class MyProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + if (body.contains("Donkey")) { + counter++; + throw new IllegalArgumentException("Donkey not allowed"); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java index 25fe6cc..0be2fea 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java @@ -20,6 +20,7 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; /** *