Repository: camel Updated Branches: refs/heads/master c475fb661 -> f4d9d3c33
CAMEL-5286: More flexible onCompletion allow to configure mode before/after consumer. And whether to use async/sync with thread pool or not. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f4d9d3c3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f4d9d3c3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f4d9d3c3 Branch: refs/heads/master Commit: f4d9d3c3397dc91aaccc980c6afc753d3df28e09 Parents: c475fb6 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 12 16:39:48 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 12 16:39:48 2014 +0200 ---------------------------------------------------------------------- .../camel/model/OnCompletionDefinition.java | 71 +++++++++- .../apache/camel/model/OnCompletionMode.java | 28 ++++ .../camel/processor/OnCompletionProcessor.java | 135 +++++++++++++++---- .../resources/org/apache/camel/model/jaxb.index | 1 + .../camel/processor/OnCompletionAsyncTest.java | 10 +- .../camel/processor/OnCompletionModeTest.java | 69 ++++++++++ .../OnCompletionParallelProcessingTest.java | 49 +++++++ .../OnCompletionUseOriginalBodyTest.java | 2 +- ...ompletionAndInterceptAndOnExceptionTest.java | 4 +- .../component/jms/JmsOnCompletionTest.java | 2 +- .../scala/dsl/SOnCompletionDefinition.scala | 8 +- .../camel/scala/dsl/SOnCompletionModeTest.scala | 48 +++++++ .../dsl/SOnCompletionOnCompleteOnlyTest.scala | 2 +- .../dsl/SOnCompletionOnFailureOnlyTest.scala | 2 +- .../processor/SpringOnCompletionModeTest.java | 33 +++++ .../processor/SpringOnCompletionModeTest.xml | 59 ++++++++ .../SpringOnCompletionUseOriginalBodyTest.xml | 2 +- 17 files changed, 480 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java index 001c0fe..1e2bb7e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java @@ -46,12 +46,16 @@ import org.apache.camel.spi.RouteContext; @XmlAccessorType(XmlAccessType.FIELD) public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> { @XmlAttribute + private OnCompletionMode mode; + @XmlAttribute private Boolean onCompleteOnly; @XmlAttribute private Boolean onFailureOnly; @XmlElement(name = "onWhen") private WhenDefinition onWhen; @XmlAttribute + private Boolean parallelProcessing; + @XmlAttribute private String executorServiceRef; @XmlAttribute(name = "useOriginalMessage") private Boolean useOriginalMessagePolicy; @@ -137,14 +141,16 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi when = onWhen.getExpression().createPredicate(routeContext); } - // executor service is mandatory for on completion - boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); - ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true); + boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); + ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing()); + + // should be after consumer by default + boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer; // should be false by default boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false; OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal, - threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original); + threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original, afterConsumer); return answer; } @@ -173,6 +179,32 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi } /** + * Sets the mode to be after route is done (default due backwards compatible). + * <p/> + * This executes the on completion work <i>after</i> the route consumer have written response + * back to the callee (if its InOut mode). + * + * @return the builder + */ + public OnCompletionDefinition modeAfterConsumer() { + setMode(OnCompletionMode.AfterConsumer); + return this; + } + + /** + * Sets the mode to be before consumer is done. + * <p/> + * This allows the on completion work to execute <i>before</i> the route consumer, writes any response + * back to the callee (if its InOut mode). + * + * @return the builder + */ + public OnCompletionDefinition modeBeforeConsumer() { + setMode(OnCompletionMode.BeforeConsumer); + return this; + } + + /** * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors). * * @return the builder @@ -239,6 +271,16 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi return this; } + /** + * Doing the on completion work in parallel + * + * @return the builder + */ + public OnCompletionDefinition parallelProcessing() { + setParallelProcessing(true); + return this; + } + public List<ProcessorDefinition<?>> getOutputs() { return outputs; } @@ -251,6 +293,14 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi return true; } + public OnCompletionMode getMode() { + return mode; + } + + public void setMode(OnCompletionMode mode) { + this.mode = mode; + } + public Boolean getOnCompleteOnly() { return onCompleteOnly; } @@ -307,4 +357,17 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi this.useOriginalMessagePolicy = useOriginalMessagePolicy; } + public Boolean getParallelProcessing() { + return parallelProcessing; + } + + public void setParallelProcessing(Boolean parallelProcessing) { + this.parallelProcessing = parallelProcessing; + } + + public boolean isParallelProcessing() { + return parallelProcessing != null && parallelProcessing; + } + + } http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java new file mode 100644 index 0000000..1d6a800 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlEnum; +import javax.xml.bind.annotation.XmlType; + +@XmlType +@XmlEnum(String.class) +public enum OnCompletionMode { + + AfterConsumer, BeforeConsumer + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index 3604a5c..35b0a52 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -28,6 +28,7 @@ import org.apache.camel.Message; import org.apache.camel.Ordered; import org.apache.camel.Predicate; import org.apache.camel.Processor; +import org.apache.camel.Route; import org.apache.camel.Traceable; import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.SynchronizationAdapter; @@ -53,9 +54,10 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces private final boolean onFailureOnly; private final Predicate onWhen; private final boolean useOriginalBody; + private final boolean afterConsumer; public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService, - boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) { + boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody, boolean afterConsumer) { notNull(camelContext, "camelContext"); notNull(processor, "processor"); this.camelContext = camelContext; @@ -66,6 +68,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces this.onFailureOnly = onFailureOnly; this.onWhen = onWhen; this.useOriginalBody = useOriginalBody; + this.afterConsumer = afterConsumer; } @Override @@ -97,13 +100,22 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces public boolean process(Exchange exchange, AsyncCallback callback) { if (processor != null) { // register callback - exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronization()); + if (afterConsumer) { + exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationAfterConsumer()); + } else { + exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationBeforeConsumer()); + } } callback.done(true); return true; } + protected boolean isCreateCopy() { + // we need to create a correlated copy if we run in parallel mode + return executorService != null; + } + /** * Processes the exchange by the processors * @@ -127,17 +139,22 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces protected Exchange prepareExchange(Exchange exchange) { Exchange answer; - // for asynchronous routing we must use a copy as we dont want it - // to cause side effects of the original exchange - // (the original thread will run in parallel) - answer = ExchangeHelper.createCorrelatedCopy(exchange, false); - if (answer.hasOut()) { - // move OUT to IN (pipes and filters) - answer.setIn(answer.getOut()); - answer.setOut(null); + if (isCreateCopy()) { + // for asynchronous routing we must use a copy as we dont want it + // to cause side effects of the original exchange + // (the original thread will run in parallel) + answer = ExchangeHelper.createCorrelatedCopy(exchange, false); + if (answer.hasOut()) { + // move OUT to IN (pipes and filters) + answer.setIn(answer.getOut()); + answer.setOut(null); + } + // set MEP to InOnly as this wire tap is a fire and forget + answer.setPattern(ExchangePattern.InOnly); + } else { + // use the exchange as-is + answer = exchange; } - // set MEP to InOnly as this wire tap is a fire and forget - answer.setPattern(ExchangePattern.InOnly); if (useOriginalBody) { LOG.trace("Using the original IN message instead of current"); @@ -152,7 +169,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces return answer; } - private final class OnCompletionSynchronization extends SynchronizationAdapter implements Ordered { + private final class OnCompletionSynchronizationAfterConsumer extends SynchronizationAdapter implements Ordered { public int getOrder() { // we want to be last @@ -173,13 +190,19 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces // must use a copy as we dont want it to cause side effects of the original exchange final Exchange copy = prepareExchange(exchange); - executorService.submit(new Callable<Exchange>() { - public Exchange call() throws Exception { - LOG.debug("Processing onComplete: {}", copy); - doProcess(processor, copy); - return copy; - } - }); + if (executorService != null) { + executorService.submit(new Callable<Exchange>() { + public Exchange call() throws Exception { + LOG.debug("Processing onComplete: {}", copy); + doProcess(processor, copy); + return copy; + } + }); + } else { + // run without thread-pool + LOG.debug("Processing onComplete: {}", copy); + doProcess(processor, copy); + } } public void onFailure(final Exchange exchange) { @@ -192,19 +215,31 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces return; } + // must use a copy as we dont want it to cause side effects of the original exchange final Exchange copy = prepareExchange(exchange); + final Exception original = copy.getException(); // must remove exception otherwise onFailure routing will fail as well // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange copy.setException(null); - executorService.submit(new Callable<Exchange>() { - public Exchange call() throws Exception { - LOG.debug("Processing onFailure: {}", copy); - doProcess(processor, copy); - return null; - } - }); + if (executorService != null) { + executorService.submit(new Callable<Exchange>() { + public Exchange call() throws Exception { + LOG.debug("Processing onFailure: {}", copy); + doProcess(processor, copy); + // restore exception after processing + copy.setException(original); + return null; + } + }); + } else { + // run without thread-pool + LOG.debug("Processing onFailure: {}", copy); + doProcess(processor, copy); + // restore exception after processing + copy.setException(original); + } } @Override @@ -219,6 +254,52 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces } } + private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered { + + public int getOrder() { + // we want to be last + return Ordered.LOWEST; + } + + @Override + public void onAfterRoute(Route route, Exchange exchange) { + if (exchange.isFailed() && onCompleteOnly) { + return; + } + + if (!exchange.isFailed() && onFailureOnly) { + return; + } + + if (onWhen != null && !onWhen.matches(exchange)) { + // predicate did not match so do not route the onComplete + return; + } + + // must use a copy as we dont want it to cause side effects of the original exchange + final Exchange copy = prepareExchange(exchange); + + if (executorService != null) { + executorService.submit(new Callable<Exchange>() { + public Exchange call() throws Exception { + LOG.debug("Processing onAfterRoute: {}", copy); + doProcess(processor, copy); + return copy; + } + }); + } else { + // run without thread-pool + LOG.debug("Processing onAfterRoute: {}", copy); + doProcess(processor, copy); + } + } + + @Override + public String toString() { + return "onAfterRoute"; + } + } + @Override public String toString() { return "OnCompletionProcessor[" + processor + "]"; http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/resources/org/apache/camel/model/jaxb.index ---------------------------------------------------------------------- diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index index f0d43ae..ea0d2b9 100644 --- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index +++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index @@ -43,6 +43,7 @@ LoopDefinition MarshalDefinition MulticastDefinition OnCompletionDefinition +OnCompletionMode OnExceptionDefinition OptimisticLockRetryPolicyDefinition OptionalIdentifiedDefinition http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java index 5c4ef2d..386e402 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java @@ -37,7 +37,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - onCompletion() + onCompletion().parallelProcessing() .to("mock:before") .delay(1000) .setBody(simple("OnComplete:${body}")) @@ -67,7 +67,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - onCompletion() + onCompletion().parallelProcessing() .to("mock:before") .delay(1000) .setBody(simple("OnComplete:${body}")) @@ -101,7 +101,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - onCompletion().useOriginalBody() + onCompletion().useOriginalBody().parallelProcessing() .to("mock:before") .delay(1000) .setBody(simple("OnComplete:${body}")) @@ -131,7 +131,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - onCompletion().useOriginalBody() + onCompletion().useOriginalBody().parallelProcessing() .to("mock:before") .delay(1000) .setBody(simple("OnComplete:${body}")) @@ -166,7 +166,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - onCompletion() + onCompletion().parallelProcessing() .to("mock:before") .delay(1000) .setBody(simple("OnComplete:${body}")) http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java new file mode 100644 index 0000000..e958ef2 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class OnCompletionModeTest extends ContextTestSupport { + + public void testOnCompletionScopeBefore() throws Exception { + getMockEndpoint("mock:input").expectedBodiesReceived("Camel"); + getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello Camel"); + + String out = template.requestBody("seda:foo", "Camel", String.class); + assertEquals("I was here Hello Camel", out); + + assertMockEndpointsSatisfied(); + } + + public void testOnCompletionScopeAfter() throws Exception { + getMockEndpoint("mock:input").expectedBodiesReceived("World"); + getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello World"); + + String out = template.requestBody("seda:bar", "World", String.class); + assertEquals("Hello World", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo") + // we do not want parallel as we want to change the message before the consumer writes the response + .onCompletion().modeBeforeConsumer() + .transform(body().prepend("I was here ")) + .to("mock:after") + .end() + .to("mock:input") + .transform(body().prepend("Hello ")).to("log:foo"); + + from("seda:bar") + // need to use parallel to make copy so we do not do side-effects + .onCompletion().modeAfterConsumer().parallelProcessing() + .transform(body().prepend("I was here ")) + .to("mock:after") + .end() + .to("mock:input") + .transform(body().prepend("Hello ")).to("log:bar"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java new file mode 100644 index 0000000..da99971 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class OnCompletionParallelProcessingTest extends ContextTestSupport { + + public void testOnCompletionParallel() throws Exception { + getMockEndpoint("mock:input").expectedBodiesReceived("World"); + getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello World"); + + String out = template.requestBody("seda:bar", "World", String.class); + assertEquals("Hello World", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:bar") + .onCompletion().parallelProcessing() + .transform(body().prepend("I was here ")) + .to("mock:after") + .end() + .to("mock:input") + .transform(body().prepend("Hello ")).to("log:bar"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java index ade4fb5..dc6d31f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java @@ -46,7 +46,7 @@ public class OnCompletionUseOriginalBodyTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - onCompletion().useOriginalBody() + onCompletion().useOriginalBody().parallelProcessing() .to("mock:before") .delay(1000) .setBody(simple("OnComplete:${body}")) http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java index 30eb66d..67a3b45 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java @@ -69,10 +69,10 @@ public class JmsOnCompletionAndInterceptAndOnExceptionTest extends CamelTestSupp public void configure() throws Exception { intercept().to("mock:intercept"); - // define a global on completion that is invoked when the exchage is complete + // define a global on completion that is invoked when the exchange is complete onCompletion().to("log:global").to("mock:sync"); - // define an on excpetion + // define an on exception onException(Exception.class).to("mock:exception"); from("activemq:queue:start") http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java index 171be60..3d51861 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java @@ -73,7 +73,7 @@ public class JmsOnCompletionTest extends CamelTestSupport { .to("mock:sync") // must use end to denote the end of the onCompletion route .end() - // here the original route contiues + // here the original route continues .process(new MyProcessor()) .to("mock:result"); // END SNIPPET: e1 http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala ---------------------------------------------------------------------- diff --git a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala index 94659f4..9277819 100644 --- a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala +++ b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala @@ -32,8 +32,12 @@ case class SOnCompletionDefinition(override val target : OnCompletionDefinition) def onFailureOnly = wrap(target.onFailureOnly) def onCompleteOnly = wrap(target.onCompleteOnly) - def useOriginalBody = wrap(target.useOriginalBody()) - + def useOriginalBody = wrap(target.useOriginalBody) + + def modeBeforeConsumer = wrap(target.modeBeforeConsumer) + def modeAfterConsumer = wrap(target.modeAfterConsumer) + + def parallelProcessing = wrap(target.parallelProcessing) def executorService(executorService: ExecutorService) = wrap(target.setExecutorService(executorService)) def executorServiceRef(ref: String) = wrap(target.setExecutorServiceRef(ref)) http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala ---------------------------------------------------------------------- diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala new file mode 100644 index 0000000..8e981bb --- /dev/null +++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.scala.dsl + +import org.apache.camel.scala.dsl.builder.{RouteBuilderSupport, RouteBuilder} +import org.apache.camel.processor.OnCompletionModeTest + +class SOnCompletionModeTest extends OnCompletionModeTest with RouteBuilderSupport { + + override def createRouteBuilder = new RouteBuilder { + + "seda:foo" ==> { + onCompletion.modeBeforeConsumer { + transform(simple("I was here ${body}")) + to("mock:after") + } + to("mock:input") + transform(simple("Hello ${body}")) + to("log:foo") + } + + "seda:bar" ==> { + onCompletion.modeAfterConsumer { + transform(simple("I was here ${body}")) + to("mock:after") + } + to("mock:input") + transform(simple("Hello ${body}")) + to("log:bar") + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala ---------------------------------------------------------------------- diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala index 837c997..354c34d 100644 --- a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala +++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala @@ -26,7 +26,7 @@ class SOnCompletionOnCompleteOnlyTest extends OnCompletionOnCompleteOnlyTest wit override def createRouteBuilder = new RouteBuilder { "direct:start" ==> { - onCompletion(completeOnly) { + onCompletion(completeOnly).parallelProcessing { to("mock:sync") } process(new MyProcessor()) http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala ---------------------------------------------------------------------- diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala index e4ce4cb..f645d54 100644 --- a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala +++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala @@ -26,7 +26,7 @@ class SOnCompletionOnFailureOnlyTest extends OnCompletionOnFailureOnlyTest with override def createRouteBuilder = new RouteBuilder { "direct:start" ==> { - onCompletion(failureOnly) { + onCompletion(failureOnly).parallelProcessing { to("mock:sync") } process(new MyProcessor()) http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java new file mode 100644 index 0000000..eee48ee --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.OnCompletionModeTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +/** + * @version + */ +public class SpringOnCompletionModeTest extends OnCompletionModeTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml new file mode 100644 index 0000000..2bc1211 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + + <route> + <from uri="seda:foo"/> + <onCompletion mode="BeforeConsumer"> + <transform> + <simple>I was here ${body}</simple> + </transform> + <to uri="mock:after"/> + </onCompletion> + <to uri="mock:input"/> + <transform> + <simple>Hello ${body}</simple> + </transform> + <to uri="log:foo"/> + </route> + + <route> + <from uri="seda:bar"/> + <onCompletion mode="AfterConsumer" parallelProcessing="true"> + <transform> + <simple>I was here ${body}</simple> + </transform> + <to uri="mock:after"/> + </onCompletion> + <to uri="mock:input"/> + <transform> + <simple>Hello ${body}</simple> + </transform> + <to uri="log:bar"/> + </route> + + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml index f125623..83b3a42 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml @@ -25,7 +25,7 @@ <bean id="myProcessor" class="org.apache.camel.processor.OnCompletionUseOriginalBodyTest$MyProcessor"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> - <onCompletion useOriginalMessage="true"> + <onCompletion useOriginalMessage="true" parallelProcessing="true"> <to uri="mock:before"/> <delay> <constant>1000</constant>