CAMEL-10442: Fixed an issue when using pipeline in Java DSL not setting up the EIP correctly which could lead to runtime route not as intended. This is a little Java DSL change so lets keep it for master branch only.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94d7b0d2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94d7b0d2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94d7b0d2 Branch: refs/heads/master Commit: 94d7b0d2a3def5bab26ba3542800dc0fd65fd8ab Parents: 74a816b Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Nov 4 17:02:14 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Nov 4 17:44:51 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 15 ++- .../camel/processor/MulticastPipelineTest.java | 109 +++++++++++++++++-- 2 files changed, 111 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/94d7b0d2/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index b7e8fc9..2ce1fc6 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -1163,7 +1163,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @return the builder */ public Type pipeline(String... uris) { - return to(uris); + PipelineDefinition answer = new PipelineDefinition(); + addOutput(answer); + answer.to(uris); + return (Type) this; } /** @@ -1176,7 +1179,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @return the builder */ public Type pipeline(Endpoint... endpoints) { - return to(endpoints); + PipelineDefinition answer = new PipelineDefinition(); + addOutput(answer); + answer.to(endpoints); + return (Type) this; } /** @@ -1189,7 +1195,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @return the builder */ public Type pipeline(Collection<Endpoint> endpoints) { - return to(endpoints); + PipelineDefinition answer = new PipelineDefinition(); + addOutput(answer); + answer.to(endpoints); + return (Type) this; } /** http://git-wip-us.apache.org/repos/asf/camel/blob/94d7b0d2/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java index d0a2999..540cd2b 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java @@ -18,15 +18,35 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.util.toolbox.AggregationStrategies; public class MulticastPipelineTest extends ContextTestSupport { - public void testMulticastPipeline() throws Exception { + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testPlainPipeline() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .pipeline("direct:a", "direct:b") + .pipeline("direct:c", "direct:d") + .to("mock:result"); + + from("direct:a").to("mock:a").setBody().constant("A"); + from("direct:b").to("mock:b").setBody().constant("B"); + from("direct:c").to("mock:c").setBody().constant("C"); + from("direct:d").to("mock:d").setBody().constant("D"); + } + }); + context.start(); + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); - getMockEndpoint("mock:b").expectedBodiesReceived("Hello World"); - getMockEndpoint("mock:c").expectedBodiesReceived("Hello World"); - getMockEndpoint("mock:d").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("A"); + getMockEndpoint("mock:c").expectedBodiesReceived("B"); + getMockEndpoint("mock:d").expectedBodiesReceived("C"); getMockEndpoint("mock:result").expectedMessageCount(1); template.sendBody("direct:start", "Hello World"); @@ -34,13 +54,40 @@ public class MulticastPipelineTest extends ContextTestSupport { assertMockEndpointsSatisfied(); } - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { + public void testPlainPipelineTo() throws Exception { + context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") - .multicast().aggregationStrategy(AggregationStrategies.groupedExchange()) + .pipeline().to("direct:a", "direct:b").end() + .pipeline().to("direct:c", "direct:d").end() + .to("mock:result"); + + from("direct:a").to("mock:a").setBody().constant("A"); + from("direct:b").to("mock:b").setBody().constant("B"); + from("direct:c").to("mock:c").setBody().constant("C"); + from("direct:d").to("mock:d").setBody().constant("D"); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("A"); + getMockEndpoint("mock:c").expectedBodiesReceived("B"); + getMockEndpoint("mock:d").expectedBodiesReceived("C"); + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testMulticastPipeline() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .multicast() .pipeline("direct:a", "direct:b") .pipeline("direct:c", "direct:d") .end() @@ -51,6 +98,48 @@ public class MulticastPipelineTest extends ContextTestSupport { from("direct:c").to("mock:c").setBody().constant("C"); from("direct:d").to("mock:d").setBody().constant("D"); } - }; + }); + context.start(); + + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("A"); + getMockEndpoint("mock:c").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:d").expectedBodiesReceived("C"); + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); } + + public void testMulticastPipelineTo() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .multicast() + .pipeline().to("direct:a", "direct:b").end() + .pipeline().to("direct:c", "direct:d").end() + .end() + .to("mock:result"); + + from("direct:a").to("mock:a").setBody().constant("A"); + from("direct:b").to("mock:b").setBody().constant("B"); + from("direct:c").to("mock:c").setBody().constant("C"); + from("direct:d").to("mock:d").setBody().constant("D"); + } + }); + context.start(); + + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("A"); + getMockEndpoint("mock:c").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:d").expectedBodiesReceived("C"); + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + }