Author: davsclaus Date: Thu May 27 06:09:18 2010 New Revision: 948683 URL: http://svn.apache.org/viewvc?rev=948683&view=rev Log: CAMEL-2758: Fixed onCompletion to pass OUT as IN. Added option useOriginalBody to it as well.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java (contents, props changed) - copied, changed from r948330, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java (with props) camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.java - copied, changed from r948665, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml - copied, changed from r948665, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=948683&r1=948682&r2=948683&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Thu May 27 06:09:18 2010 @@ -57,6 +57,8 @@ public class OnCompletionDefinition exte private ExecutorService executorService; @XmlAttribute(required = false) private String executorServiceRef; + @XmlAttribute(name = "useOriginalMessage", required = false) + private Boolean useOriginalMessagePolicy = Boolean.FALSE; public OnCompletionDefinition() { } @@ -101,8 +103,8 @@ public class OnCompletionDefinition exte if (executorService == null) { executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "OnCompletion"); } - OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor, executorService, - onCompleteOnly, onFailureOnly, when); + OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor, + executorService, onCompleteOnly, onFailureOnly, when, useOriginalMessagePolicy); return answer; } @@ -185,7 +187,19 @@ public class OnCompletionDefinition exte onWhen.setExpression(clause); return clause; } - + + /** + * Will use the original input body when an {...@link org.apache.camel.Exchange} for this on completion. + * <p/> + * By default this feature is off. + * + * @return the builder + */ + public OnCompletionDefinition useOriginalBody() { + setUseOriginalMessagePolicy(Boolean.TRUE); + return this; + } + public OnCompletionDefinition executorService(ExecutorService executorService) { setExecutorService(executorService); return this; @@ -243,4 +257,13 @@ public class OnCompletionDefinition exte public void setExecutorServiceRef(String executorServiceRef) { this.executorServiceRef = executorServiceRef; } + + public Boolean getUseOriginalMessagePolicy() { + return useOriginalMessagePolicy; + } + + public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) { + this.useOriginalMessagePolicy = useOriginalMessagePolicy; + } + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=948683&r1=948682&r2=948683&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Thu May 27 06:09:18 2010 @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorServ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.impl.ServiceSupport; @@ -42,15 +43,15 @@ public class OnCompletionProcessor exten private final CamelContext camelContext; private final Processor processor; private final ExecutorService executorService; - private boolean onCompleteOnly; - private boolean onFailureOnly; - private Predicate onWhen; + private final boolean onCompleteOnly; + private final boolean onFailureOnly; + private final Predicate onWhen; + private final boolean useOriginalBody; public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, - boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) { + boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) { notNull(camelContext, "camelContext"); notNull(processor, "processor"); - notNull(executorService, "executorService"); this.camelContext = camelContext; // wrap processor in UnitOfWork so what we send out runs in a UoW this.processor = new UnitOfWorkProcessor(processor); @@ -58,6 +59,7 @@ public class OnCompletionProcessor exten this.onCompleteOnly = onCompleteOnly; this.onFailureOnly = onFailureOnly; this.onWhen = onWhen; + this.useOriginalBody = useOriginalBody; } protected void doStart() throws Exception { @@ -81,7 +83,7 @@ public class OnCompletionProcessor exten // register callback exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() { @Override - public void onComplete(Exchange exchange) { + public void onComplete(final Exchange exchange) { if (onFailureOnly) { return; } @@ -105,7 +107,7 @@ public class OnCompletionProcessor exten }); } - public void onFailure(Exchange exchange) { + public void onFailure(final Exchange exchange) { if (onCompleteOnly) { return; } @@ -127,7 +129,7 @@ public class OnCompletionProcessor exten LOG.debug("Processing onFailure: " + copy); } doProcess(processor, copy); - return copy; + return null; } }); } @@ -159,7 +161,6 @@ public class OnCompletionProcessor exten } } - /** * Prepares the {...@link Exchange} to send as onCompletion. * @@ -167,13 +168,33 @@ public class OnCompletionProcessor exten * @return the exchange to be routed in onComplete */ protected Exchange prepareExchange(Exchange exchange) { - // must use a copy as we dont want it to cause side effects of the original exchange - final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + Exchange answer; + + // for asynchronous routing we must use a copy as we dont want it + // to cause side effects of the original exchange + // (the original thread will run in parallel) + answer = ExchangeHelper.createCorrelatedCopy(exchange, false); + if (answer.hasOut()) { + // move OUT to IN (pipes and filters) + answer.setIn(answer.getOut()); + answer.setOut(null); + } // set MEP to InOnly as this wire tap is a fire and forget - copy.setPattern(ExchangePattern.InOnly); + answer.setPattern(ExchangePattern.InOnly); + + if (useOriginalBody) { + if (LOG.isTraceEnabled()) { + LOG.trace("Using the original IN message instead of current"); + } + + Message original = exchange.getUnitOfWork().getOriginalInMessage(); + answer.setIn(original); + } + // add a header flag to indicate its a on completion exchange - copy.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE); - return copy; + answer.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE); + + return answer; } @Override Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java (from r948330, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java&r1=948330&r2=948683&rev=948683&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java Thu May 27 06:09:18 2010 @@ -23,32 +23,74 @@ import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import static org.apache.camel.language.simple.SimpleLanguage.simple; + /** * @version $Revision$ */ -public class OnCompletionGlobalTest extends ContextTestSupport { +public class OnCompletionAsyncTest extends ContextTestSupport { - public void testSynchronizeComplete() throws Exception { - getMockEndpoint("mock:sync").expectedBodiesReceived("Bye World"); - getMockEndpoint("mock:sync").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testAsyncComplete() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onCompletion() + .to("mock:before") + .delay(1000) + .setBody(simple("OnComplete:${body}")) + .to("mock:after"); + + from("direct:start") + .process(new MyProcessor()) + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:before").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:before").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + getMockEndpoint("mock:after").expectedBodiesReceived("OnComplete:Bye World"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Bye World"); - template.sendBody("direct:start", "Hello World"); + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); assertMockEndpointsSatisfied(); } - public void testSynchronizeFailure() throws Exception { - getMockEndpoint("mock:sync").expectedMessageCount(1); - getMockEndpoint("mock:sync").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + public void testAsyncFailure() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onCompletion() + .to("mock:before") + .delay(1000) + .setBody(simple("OnComplete:${body}")) + .to("mock:after"); + + from("direct:start") + .process(new MyProcessor()) + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:before").expectedBodiesReceived("Kabom"); + getMockEndpoint("mock:before").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + getMockEndpoint("mock:after").expectedBodiesReceived("OnComplete:Kabom"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); try { - template.sendBody("direct:start", "Kabom"); + template.requestBody("direct:start", "Kabom"); fail("Should throw exception"); } catch (CamelExecutionException e) { assertEquals("Kabom", e.getCause().getMessage()); @@ -57,21 +99,101 @@ public class OnCompletionGlobalTest exte assertMockEndpointsSatisfied(); } - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { + public void testAsyncCompleteUseOriginalBody() throws Exception { + context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - // START SNIPPET: e1 - // define a global on completion that is invoked when the exchange is complete - onCompletion().to("log:global").to("mock:sync"); + onCompletion().useOriginalBody() + .to("mock:before") + .delay(1000) + .setBody(simple("OnComplete:${body}")) + .to("mock:after"); from("direct:start") .process(new MyProcessor()) .to("mock:result"); - // END SNIPPET: e1 } - }; + }); + context.start(); + + getMockEndpoint("mock:before").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:before").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + getMockEndpoint("mock:after").expectedBodiesReceived("OnComplete:Hello World"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + public void testAsyncFailureUseOriginalBody() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onCompletion().useOriginalBody() + .to("mock:before") + .delay(1000) + .setBody(simple("OnComplete:${body}")) + .to("mock:after"); + + from("direct:start") + .transform(body().prepend("Before:${body}")) + .process(new MyProcessor()) + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:before").expectedBodiesReceived("Kabom"); + getMockEndpoint("mock:before").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + getMockEndpoint("mock:after").expectedBodiesReceived("OnComplete:Kabom"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + try { + template.requestBody("direct:start", "Kabom"); + fail("Should throw exception"); + } catch (CamelExecutionException e) { + assertEquals("Kabom", e.getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + public void testAsyncCompleteOnCompleteFail() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onCompletion() + .to("mock:before") + .delay(1000) + .setBody(simple("OnComplete:${body}")) + // this exception does not cause any side effect as we are in async mode + .throwException(new IllegalAccessException("From onComplete")) + .to("mock:after"); + + from("direct:start") + .process(new MyProcessor()) + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:before").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:before").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + getMockEndpoint("mock:after").expectedMessageCount(0); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); } public static class MyProcessor implements Processor { @@ -80,10 +202,11 @@ public class OnCompletionGlobalTest exte } public void process(Exchange exchange) throws Exception { - if ("Kabom".equals(exchange.getIn().getBody())) { + if (exchange.getIn().getBody(String.class).contains("Kabom")) { throw new IllegalArgumentException("Kabom"); } exchange.getIn().setBody("Bye World"); } } + } \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java?rev=948683&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java Thu May 27 06:09:18 2010 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +import static org.apache.camel.language.simple.SimpleLanguage.simple; + +/** + * @version $Revision$ + */ +public class OnCompletionUseOriginalBodyTest extends ContextTestSupport { + + public void testOnCompletionUseOriginalBody() throws Exception { + getMockEndpoint("mock:before").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:before").expectedPropertyReceived(Exchange.ON_COMPLETION, true); + getMockEndpoint("mock:after").expectedBodiesReceived("OnComplete:Hello World"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onCompletion().useOriginalBody() + .to("mock:before") + .delay(1000) + .setBody(simple("OnComplete:${body}")) + .to("mock:after"); + + from("direct:start") + .process(new MyProcessor()) + .to("mock:result"); + } + }; + } + + public static class MyProcessor implements Processor { + + public MyProcessor() { + } + + public void process(Exchange exchange) throws Exception { + if (exchange.getIn().getBody(String.class).contains("Kabom")) { + throw new IllegalArgumentException("Kabom"); + } + exchange.getIn().setBody("Bye World"); + } + } + +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.java (from r948665, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.java&r1=948665&r2=948683&rev=948683&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.java Thu May 27 06:09:18 2010 @@ -17,16 +17,17 @@ package org.apache.camel.spring.processor; import org.apache.camel.CamelContext; -import org.apache.camel.processor.OnCompletionTest; +import org.apache.camel.processor.OnCompletionUseOriginalBodyTest; + import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; /** * @version $Revision$ */ -public class SpringOnCompletionGlobalTest extends OnCompletionTest { +public class SpringOnCompletionUseOriginalBodyTest extends OnCompletionUseOriginalBodyTest { protected CamelContext createCamelContext() throws Exception { - return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.xml"); + return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml"); } } \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml (from r948665, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.xml&r1=948665&r2=948683&rev=948683&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionGlobalTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml Thu May 27 06:09:18 2010 @@ -22,15 +22,18 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <bean id="myProcessor" class="org.apache.camel.processor.OnCompletionTest$MyProcessor"/> + <bean id="myProcessor" class="org.apache.camel.processor.OnCompletionUseOriginalBodyTest$MyProcessor"/> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> - <!-- START SNIPPET: e1 --> - <!-- this is a global onCompletion route that is invoke when any exchange is complete - as a kind of after callback --> - <onCompletion> - <to uri="log:global"/> - <to uri="mock:sync"/> + <onCompletion useOriginalMessage="true"> + <to uri="mock:before"/> + <delay> + <constant>1000</constant> + </delay> + <setBody> + <simple>OnComplete:${body}</simple> + </setBody> + <to uri="mock:after"/> </onCompletion> <route> @@ -38,6 +41,6 @@ <process ref="myProcessor"/> <to uri="mock:result"/> </route> - <!-- END SNIPPET: e1 --> </camelContext> + </beans>