Author: davsclaus Date: Wed Apr 13 11:47:06 2011 New Revision: 1091752 URL: http://svn.apache.org/viewvc?rev=1091752&view=rev Log: CAMEL-3857: WireTap can now set header directly in DSL when in send new message mode.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java - copied, changed from r1091660, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.java - copied, changed from r1091440, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml - copied, changed from r1091440, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=1091752&r1=1091751&r2=1091752&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Wed Apr 13 11:47:06 2011 @@ -16,11 +16,14 @@ */ package org.apache.camel.model; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementRef; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; @@ -47,11 +50,12 @@ public class WireTapDefinition<Type exte protected Endpoint endpoint; @XmlTransient private Processor newExchangeProcessor; - // TODO: Should be named newExchangeRef instead of processorRef (Camel 3.0) @XmlAttribute(name = "processorRef") private String newExchangeProcessorRef; @XmlElement(name = "body") private ExpressionSubElementDefinition newExchangeExpression; + @XmlElementRef + private List<SetHeaderDefinition> headers = new ArrayList<SetHeaderDefinition>(); @XmlTransient private ExecutorService executorService; @XmlAttribute @@ -88,10 +92,18 @@ public class WireTapDefinition<Type exte if (newExchangeProcessorRef != null) { newExchangeProcessor = routeContext.lookup(newExchangeProcessorRef, Processor.class); } - answer.setNewExchangeProcessor(newExchangeProcessor); + if (newExchangeProcessor != null) { + answer.addNewExchangeProcessor(newExchangeProcessor); + } if (newExchangeExpression != null) { answer.setNewExchangeExpression(newExchangeExpression.createExpression(routeContext)); } + if (headers != null && !headers.isEmpty()) { + for (SetHeaderDefinition header : headers) { + Processor processor = header.createProcessor(routeContext); + answer.addNewExchangeProcessor(processor); + } + } if (onPrepareRef != null) { onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); } @@ -180,12 +192,21 @@ public class WireTapDefinition<Type exte } /** + * @deprecated use newExchangeBody + */ + @Deprecated + public WireTapDefinition<Type> newExchange(Expression expression) { + return newExchangeBody(expression); + } + + /** * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} * * @param expression expression that creates the new body to send * @return the builder + * @see #newExchangeHeader(String, org.apache.camel.Expression) */ - public WireTapDefinition<Type> newExchange(Expression expression) { + public WireTapDefinition<Type> newExchangeBody(Expression expression) { setNewExchangeExpression(expression); return this; } @@ -193,8 +214,21 @@ public class WireTapDefinition<Type exte /** * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} * + * @param ref reference to the {@link Processor} to lookup in the {@link org.apache.camel.spi.Registry} to + * be used for preparing the new exchange to send + * @return the builder + */ + public WireTapDefinition<Type> newExchangeRef(String ref) { + setNewExchangeProcessorRef(ref); + return this; + } + + /** + * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} + * * @param processor processor preparing the new exchange to send * @return the builder + * @see #newExchangeHeader(String, org.apache.camel.Expression) */ public WireTapDefinition<Type> newExchange(Processor processor) { setNewExchangeProcessor(processor); @@ -202,14 +236,17 @@ public class WireTapDefinition<Type exte } /** - * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} + * Sets a header on the <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}. + * <p/> + * Use this together with the {@link #newExchange(org.apache.camel.Expression)} or {@link #newExchange(org.apache.camel.Processor)} + * methods. * - * @param ref reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} to - * be used for preparing the new exchange to send + * @param headerName the header name + * @param expression the expression setting the header value * @return the builder */ - public WireTapDefinition<Type> newExchangeRef(String ref) { - setNewExchangeProcessorRef(ref); + public WireTapDefinition<Type> newExchangeHeader(String headerName, Expression expression) { + headers.add(new SetHeaderDefinition(headerName, expression)); return this; } @@ -335,4 +372,12 @@ public class WireTapDefinition<Type exte public void setOnPrepare(Processor onPrepare) { this.onPrepare = onPrepare; } + + public List<SetHeaderDefinition> getHeaders() { + return headers; + } + + public void setHeaders(List<SetHeaderDefinition> headers) { + this.headers = headers; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=1091752&r1=1091751&r2=1091752&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Wed Apr 13 11:47:06 2011 @@ -16,6 +16,8 @@ */ package org.apache.camel.processor; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -42,7 +44,7 @@ public class WireTapProcessor extends Se // expression or processor used for populating a new exchange to send // as opposed to traditional wiretap that sends a copy of the original exchange private Expression newExchangeExpression; - private Processor newExchangeProcessor; + private List<Processor> newExchangeProcessors; private boolean copy; private Processor onPrepare; @@ -132,25 +134,29 @@ public class WireTapProcessor extends Se answer.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri()); // prepare the exchange - if (newExchangeProcessor != null) { - try { - newExchangeProcessor.process(answer); - } catch (Exception e) { - throw ObjectHelper.wrapRuntimeCamelException(e); - } - } else if (newExchangeExpression != null) { + if (newExchangeExpression != null) { Object body = newExchangeExpression.evaluate(answer, Object.class); if (body != null) { answer.getIn().setBody(body); } } + if (newExchangeProcessors != null) { + for (Processor processor : newExchangeProcessors) { + try { + processor.process(answer); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } + // invoke on prepare on the exchange if specified if (onPrepare != null) { try { onPrepare.process(exchange); } catch (Exception e) { - exchange.setException(e); + throw ObjectHelper.wrapRuntimeCamelException(e); } } @@ -169,12 +175,12 @@ public class WireTapProcessor extends Se return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); } - public Processor getNewExchangeProcessor() { - return newExchangeProcessor; + public List<Processor> getNewExchangeProcessors() { + return newExchangeProcessors; } - public void setNewExchangeProcessor(Processor newExchangeProcessor) { - this.newExchangeProcessor = newExchangeProcessor; + public void setNewExchangeProcessors(List<Processor> newExchangeProcessors) { + this.newExchangeProcessors = newExchangeProcessors; } public Expression getNewExchangeExpression() { @@ -185,6 +191,13 @@ public class WireTapProcessor extends Se this.newExchangeExpression = newExchangeExpression; } + public void addNewExchangeProcessor(Processor processor) { + if (newExchangeProcessors == null) { + newExchangeProcessors = new ArrayList<Processor>(); + } + newExchangeProcessors.add(processor); + } + public boolean isCopy() { return copy; } @@ -200,4 +213,5 @@ public class WireTapProcessor extends Se public void setOnPrepare(Processor onPrepare) { this.onPrepare = onPrepare; } + } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java (from r1091660, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java&r1=1091660&r2=1091752&rev=1091752&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java Wed Apr 13 11:47:06 2011 @@ -16,96 +16,56 @@ */ package org.apache.camel.processor; +import java.text.SimpleDateFormat; +import java.util.Date; + import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; /** * @version */ -public class WireTapUsingFireAndForgetTest extends ContextTestSupport { - - @Override - public boolean isUseRouteBuilder() { - return false; - } - - public void testFireAndForgetUsingProcessor() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - // START SNIPPET: e1 - from("direct:start") - .wireTap("direct:foo", false, new Processor() { - public void process(Exchange exchange) throws Exception { - exchange.getIn().setBody("Bye World"); - exchange.getIn().setHeader("foo", "bar"); - } - }).to("mock:result"); - - - from("direct:foo").to("mock:foo"); - // END SNIPPET: e1 - } - }); - context.start(); +public class WireTapNewExchangeTest extends ContextTestSupport { + public void testFireAndForgetUsingExpressions() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); result.expectedBodiesReceived("Hello World"); - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedBodiesReceived("Bye World"); - foo.expectedHeaderReceived("foo", "bar"); + MockEndpoint tap = getMockEndpoint("mock:tap"); + tap.expectedBodiesReceived("Bye World"); + tap.expectedHeaderReceived("id", 123); + String today = new SimpleDateFormat("yyyyMMdd").format(new Date()); + tap.expectedHeaderReceived("date", today); template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); - - // should be different exchange instances - Exchange e1 = result.getReceivedExchanges().get(0); - Exchange e2 = foo.getReceivedExchanges().get(0); - assertNotSame("Should not be same Exchange", e1, e2); - - // should have same from endpoint - assertEquals("direct://start", e1.getFromEndpoint().getEndpointUri()); - assertEquals("direct://start", e2.getFromEndpoint().getEndpointUri()); } - public void testFireAndForgetUsingExpression() throws Exception { - context.addRoutes(new RouteBuilder() { + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { @Override public void configure() throws Exception { - // START SNIPPET: e2 + // START SNIPPET: e1 from("direct:start") - .wireTap("direct:foo", false, constant("Bye World")) + // tap a new message and send it to direct:tap + // the new message should be Bye World with 2 headers + .wireTap("direct:tap") + // create the new tap message body and headers + .newExchangeBody(constant("Bye World")) + .newExchangeHeader("id", constant(123)) + .newExchangeHeader("date", simple("${date:now:yyyyMMdd}")) + .end() + // here we continue routing the original messages .to("mock:result"); - from("direct:foo").to("mock:foo"); - // END SNIPPET: e2 + // this is the tapped route + from("direct:tap") + .to("mock:tap"); + // END SNIPPET: e1 } - }); - context.start(); - - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedBodiesReceived("Hello World"); - - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedBodiesReceived("Bye World"); - - template.sendBody("direct:start", "Hello World"); - - assertMockEndpointsSatisfied(); - - // should be different exchange instances - Exchange e1 = result.getReceivedExchanges().get(0); - Exchange e2 = foo.getReceivedExchanges().get(0); - assertNotSame("Should not be same Exchange", e1, e2); - - // should have same from endpoint - assertEquals("direct://start", e1.getFromEndpoint().getEndpointUri()); - assertEquals("direct://start", e2.getFromEndpoint().getEndpointUri()); + }; } - } Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.java (from r1091440, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.java&r1=1091440&r2=1091752&rev=1091752&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.java Wed Apr 13 11:47:06 2011 @@ -17,71 +17,15 @@ package org.apache.camel.spring.processor; import org.apache.camel.CamelContext; -import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.WireTapNewExchangeTest; + import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; -public class SpringWireTapUsingFireAndForgetTest extends ContextTestSupport { +public class SpringWireTapNewExchangeTest extends WireTapNewExchangeTest { protected CamelContext createCamelContext() throws Exception { - return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml"); - } - - public void testFireAndForgetUsingExpression() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedBodiesReceived("Hello World"); - - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedBodiesReceived("Bye World"); - - template.sendBody("direct:start", "Hello World"); - - assertMockEndpointsSatisfied(); - - // should be different exchange instances - Exchange e1 = result.getReceivedExchanges().get(0); - Exchange e2 = foo.getReceivedExchanges().get(0); - assertNotSame("Should not be same Exchange", e1, e2); - - // should have same from endpoint - assertEquals("direct://start", e1.getFromEndpoint().getEndpointUri()); - assertEquals("direct://start", e2.getFromEndpoint().getEndpointUri()); - } - - public void testFireAndForgetUsingProcessor() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedBodiesReceived("Hello World"); - - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedBodiesReceived("Bye World"); - foo.expectedHeaderReceived("foo", "bar"); - - template.sendBody("direct:start2", "Hello World"); - - assertMockEndpointsSatisfied(); - - // should be different exchange instances - Exchange e1 = result.getReceivedExchanges().get(0); - Exchange e2 = foo.getReceivedExchanges().get(0); - assertNotSame("Should not be same Exchange", e1, e2); - - // should have same from endpoint - assertEquals("direct://start2", e1.getFromEndpoint().getEndpointUri()); - assertEquals("direct://start2", e2.getFromEndpoint().getEndpointUri()); + return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml"); } - // START SNIPPET: e1 - public static class MyProcessor implements Processor { - - public void process(Exchange exchange) throws Exception { - // here we prepare the new exchange by setting the payload on the exchange - // on the IN message. - exchange.getIn().setBody("Bye World"); - exchange.getIn().setHeader("foo", "bar"); - } - } - // END SNIPPET: e1 +} -} \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml (from r1091440, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml&r1=1091440&r2=1091752&rev=1091752&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml Wed Apr 13 11:47:06 2011 @@ -22,33 +22,29 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <camelContext xmlns="http://camel.apache.org/schema/spring"> <!-- START SNIPPET: e1 --> <route> <from uri="direct:start"/> - <wireTap uri="direct:foo"> + <!-- tap a new message and send it to direct:tap --> + <!-- the new message should be Bye World with 2 headers --> + <wireTap uri="direct:tap"> + <!-- create the new tap message body and headers --> <body><constant>Bye World</constant></body> + <setHeader headerName="id"><constant>123</constant></setHeader> + <setHeader headerName="date"><simple>${date:now:yyyyMMdd}</simple></setHeader> </wireTap> + <!-- here we continue routing the original message --> <to uri="mock:result"/> </route> <!-- END SNIPPET: e1 --> - <!-- START SNIPPET: e2 --> - <route> - <from uri="direct:start2"/> - <wireTap uri="direct:foo" processorRef="myProcessor"/> - <to uri="mock:result"/> - </route> - <!-- END SNIPPET: e2 --> - + <!-- this is the tapped route --> <route> - <from uri="direct:foo"/> - <to uri="mock:foo"/> + <from uri="direct:tap"/> + <to uri="mock:tap"/> </route> </camelContext> - <bean id="myProcessor" class="org.apache.camel.spring.processor.SpringWireTapUsingFireAndForgetTest$MyProcessor"/> - </beans>