CAMEL-10612: fixing unwrap stream processor with std and empty data
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7e838d9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7e838d9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7e838d9 Branch: refs/heads/master Commit: a7e838d963b76ef0d943efef03f5d4c6eaaf4624 Parents: 0f9b93b Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Feb 3 10:01:17 2017 +0100 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Fri Feb 3 13:46:47 2017 +0100 ---------------------------------------------------------------------- .../streams/util/UnwrapStreamProcessor.java | 6 +- .../reactive/streams/BeanCallTest.java | 124 ++++++++++++++++++- 2 files changed, 126 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java index 6800ce0..3fb1a8a 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java @@ -78,8 +78,12 @@ public class UnwrapStreamProcessor implements AsyncProcessor { } }); + + return false; } - return false; + + callback.done(true); + return true; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java index 5a532ae..1b97382 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.reactive.streams; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + import io.reactivex.Flowable; import org.apache.camel.Exchange; @@ -43,7 +47,6 @@ public class BeanCallTest extends CamelTestSupport { from("direct:num") .bean(BeanCallTest.this, "processBody") .process(new UnwrapStreamProcessor()) // Can be removed? - .split().body() .to("mock:endpoint"); from("direct:handle") @@ -76,7 +79,6 @@ public class BeanCallTest extends CamelTestSupport { from("direct:num") .bean(BeanCallTest.this, "processBodyWrongType") .process(new UnwrapStreamProcessor()) // Can be removed? - .split().body() .to("mock:endpoint"); from("direct:handle") @@ -108,7 +110,6 @@ public class BeanCallTest extends CamelTestSupport { from("direct:num") .bean(BeanCallTest.this, "processHeader") .process(new UnwrapStreamProcessor()) // Can be removed? - .split().body() .to("mock:endpoint"); from("direct:handle") @@ -129,6 +130,110 @@ public class BeanCallTest extends CamelTestSupport { assertEquals("HelloHeader 2", exchange.getIn().getBody()); } + @Test + public void beanCallEmptyPublisherTest() throws Exception { + new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(Throwable.class).to("direct:handle").handled(true); + + from("direct:num") + .bean(BeanCallTest.this, "processBodyEmpty") + .process(new UnwrapStreamProcessor()) // Can be removed? + .to("mock:endpoint"); + + from("direct:handle") + .setBody().constant("ERR") + .to("mock:endpoint"); + + } + }.addRoutesToCamelContext(context); + + MockEndpoint mock = getMockEndpoint("mock:endpoint"); + mock.expectedMessageCount(1); + + context.start(); + + template.sendBody("direct:num", 1); + mock.assertIsSatisfied(); + + Exchange exchange = mock.getExchanges().get(0); + Object body = exchange.getIn().getBody(); + assertEquals(new Integer(1), body); // unchanged + } + + @Test + public void beanCallTwoElementsTest() throws Exception { + new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(Throwable.class).to("direct:handle").handled(true); + + from("direct:num") + .bean(BeanCallTest.this, "processBodyTwoItems") + .process(new UnwrapStreamProcessor()) // Can be removed? + .to("mock:endpoint"); + + from("direct:handle") + .setBody().constant("ERR") + .to("mock:endpoint"); + + } + }.addRoutesToCamelContext(context); + + MockEndpoint mock = getMockEndpoint("mock:endpoint"); + mock.expectedMessageCount(1); + + context.start(); + + template.sendBody("direct:num", 1); + mock.assertIsSatisfied(); + + Exchange exchange = mock.getExchanges().get(0); + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof Collection); + @SuppressWarnings("unchecked") + List<String> data = new LinkedList<>((Collection<String>) body); + assertListSize(data, 2); + assertEquals("HelloBody 1", data.get(0)); + assertEquals("HelloBody 1", data.get(1)); + } + + @Test + public void beanCallStdReturnTypeTest() throws Exception { + new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(Throwable.class).to("direct:handle").handled(true); + + from("direct:num") + .bean(BeanCallTest.this, "processBodyStd") + .process(new UnwrapStreamProcessor()) // Can be removed? + .to("mock:endpoint"); + + from("direct:handle") + .setBody().constant("ERR") + .to("mock:endpoint"); + + } + }.addRoutesToCamelContext(context); + + MockEndpoint mock = getMockEndpoint("mock:endpoint"); + mock.expectedMessageCount(1); + + context.start(); + + template.sendBody("direct:num", 1); + mock.assertIsSatisfied(); + + Exchange exchange = mock.getExchanges().get(0); + Object body = exchange.getIn().getBody(); + assertEquals("Hello", body); + } + public Publisher<String> processBody(Publisher<Integer> data) { return Flowable.fromPublisher(data) .map(l -> "HelloBody " + l); @@ -144,6 +249,19 @@ public class BeanCallTest extends CamelTestSupport { .map(l -> "HelloHeader " + l); } + public Publisher<String> processBodyTwoItems(Publisher<Integer> data) { + return Flowable.fromPublisher(data).mergeWith(data) + .map(l -> "HelloBody " + l); + } + + public Publisher<String> processBodyEmpty(Publisher<Integer> data) { + return Flowable.empty(); + } + + public String processBodyStd(Publisher<Integer> data) { + return "Hello"; + } + @Override public boolean isUseRouteBuilder() { return false;