CAMEL-10612: fixing unwrap stream processor with std and empty data

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7e838d9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7e838d9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7e838d9

Branch: refs/heads/master
Commit: a7e838d963b76ef0d943efef03f5d4c6eaaf4624
Parents: 0f9b93b
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Fri Feb 3 10:01:17 2017 +0100
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Fri Feb 3 13:46:47 2017 +0100

----------------------------------------------------------------------
 .../streams/util/UnwrapStreamProcessor.java     |   6 +-
 .../reactive/streams/BeanCallTest.java          | 124 ++++++++++++++++++-
 2 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
index 6800ce0..3fb1a8a 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -78,8 +78,12 @@ public class UnwrapStreamProcessor implements AsyncProcessor 
{
                 }
 
             });
+
+            return false;
         }
-        return false;
+
+        callback.done(true);
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 5a532ae..1b97382 100644
--- 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.reactive.streams;
 
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
 import io.reactivex.Flowable;
 
 import org.apache.camel.Exchange;
@@ -43,7 +47,6 @@ public class BeanCallTest extends CamelTestSupport {
                 from("direct:num")
                         .bean(BeanCallTest.this, "processBody")
                         .process(new UnwrapStreamProcessor()) // Can be 
removed?
-                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -76,7 +79,6 @@ public class BeanCallTest extends CamelTestSupport {
                 from("direct:num")
                         .bean(BeanCallTest.this, "processBodyWrongType")
                         .process(new UnwrapStreamProcessor()) // Can be 
removed?
-                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -108,7 +110,6 @@ public class BeanCallTest extends CamelTestSupport {
                 from("direct:num")
                         .bean(BeanCallTest.this, "processHeader")
                         .process(new UnwrapStreamProcessor()) // Can be 
removed?
-                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -129,6 +130,110 @@ public class BeanCallTest extends CamelTestSupport {
         assertEquals("HelloHeader 2", exchange.getIn().getBody());
     }
 
+    @Test
+    public void beanCallEmptyPublisherTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyEmpty")
+                        .process(new UnwrapStreamProcessor()) // Can be 
removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        Object body = exchange.getIn().getBody();
+        assertEquals(new Integer(1), body); // unchanged
+    }
+
+    @Test
+    public void beanCallTwoElementsTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyTwoItems")
+                        .process(new UnwrapStreamProcessor()) // Can be 
removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        Object body = exchange.getIn().getBody();
+        assertTrue(body instanceof Collection);
+        @SuppressWarnings("unchecked")
+        List<String> data = new LinkedList<>((Collection<String>) body);
+        assertListSize(data, 2);
+        assertEquals("HelloBody 1", data.get(0));
+        assertEquals("HelloBody 1", data.get(1));
+    }
+
+    @Test
+    public void beanCallStdReturnTypeTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyStd")
+                        .process(new UnwrapStreamProcessor()) // Can be 
removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        Object body = exchange.getIn().getBody();
+        assertEquals("Hello", body);
+    }
+
     public Publisher<String> processBody(Publisher<Integer> data) {
         return Flowable.fromPublisher(data)
                 .map(l -> "HelloBody " + l);
@@ -144,6 +249,19 @@ public class BeanCallTest extends CamelTestSupport {
                 .map(l -> "HelloHeader " + l);
     }
 
+    public Publisher<String> processBodyTwoItems(Publisher<Integer> data) {
+        return Flowable.fromPublisher(data).mergeWith(data)
+                .map(l -> "HelloBody " + l);
+    }
+
+    public Publisher<String> processBodyEmpty(Publisher<Integer> data) {
+        return Flowable.empty();
+    }
+
+    public String processBodyStd(Publisher<Integer> data) {
+        return "Hello";
+    }
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;

Reply via email to