Repository: camel
Updated Branches:
  refs/heads/master 9b8397f08 -> 9020df3be


CAMEL-7833 InOnly and InOut routes as Observable<Exchange> sequences


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33ebe714
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33ebe714
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33ebe714

Branch: refs/heads/master
Commit: 33ebe714a9b304632eaeed2bdd0859d583b41563
Parents: 9b8397f
Author: Jyrki Ruuskanen <yur...@kotikone.fi>
Authored: Sun Apr 5 12:59:12 2015 +0300
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Mon Apr 6 15:38:47 2015 +0800

----------------------------------------------------------------------
 components/camel-rx/pom.xml                     |  9 ++++
 .../java/org/apache/camel/rx/CamelOperator.java | 24 +++------
 .../java/org/apache/camel/rx/ReactiveCamel.java | 28 ++++++++++-
 .../org/apache/camel/rx/CamelOperatorTest.java  | 53 +++++++++++++++++---
 .../src/test/resources/log4j.properties         |  2 +-
 5 files changed, 87 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rx/pom.xml b/components/camel-rx/pom.xml
index cb4827f..63baf5f 100644
--- a/components/camel-rx/pom.xml
+++ b/components/camel-rx/pom.xml
@@ -39,6 +39,10 @@
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-restlet</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>io.reactivex</groupId>
@@ -62,6 +66,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.jayway.restassured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <version>2.3.0</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java 
b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
index 2a6fa3a..917f069 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -19,14 +19,13 @@ package org.apache.camel.rx;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.processor.PipelineHelper;
 import org.apache.camel.util.ServiceHelper;
 import rx.Observable;
 import rx.Subscriber;
 
-public class CamelOperator implements Observable.Operator<Message, Message> {
+public class CamelOperator implements Observable.Operator<Exchange, Exchange> {
 
     private ProducerTemplate producerTemplate;
     private Endpoint endpoint;
@@ -44,8 +43,8 @@ public class CamelOperator implements 
Observable.Operator<Message, Message> {
     }
 
     @Override
-    public Subscriber<? super Message> call(final Subscriber<? super Message> 
s) {
-        return new Subscriber<Message>(s) {
+    public Subscriber<? super Exchange> call(final Subscriber<? super 
Exchange> s) {
+        return new Subscriber<Exchange>(s) {
             @Override
             public void onCompleted() {
                 try {
@@ -70,19 +69,14 @@ public class CamelOperator implements 
Observable.Operator<Message, Message> {
             }
 
             @Override
-            public void onNext(Message item) {
+            public void onNext(Exchange item) {
                 if (!s.isUnsubscribed()) {
                     Exchange exchange = process(item);
                     if (exchange.getException() != null) {
                         s.onError(exchange.getException());
                     } else {
-                        if (exchange.hasOut()) {
-                            s.onNext(exchange.getOut());
-                        } else {
-                            s.onNext(exchange.getIn());
-                        }
+                        s.onNext(PipelineHelper.createNextExchange(exchange));
                     }
-
                 }
             }
         };
@@ -96,10 +90,4 @@ public class CamelOperator implements 
Observable.Operator<Message, Message> {
         }
         return exchange;
     }
-
-    private Exchange process(Message message) {
-        Exchange exchange = endpoint.createExchange();
-        exchange.setIn(message);
-        return process(exchange);
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java 
b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 678c4e8..e0eb869 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
 import org.apache.camel.rx.support.EndpointObservable;
 import org.apache.camel.rx.support.EndpointSubscribeFunc;
 import org.apache.camel.rx.support.ExchangeToBodyFunc1;
-import org.apache.camel.rx.support.ExchangeToMessageFunc1;
 import org.apache.camel.rx.support.ObserverSender;
 import org.apache.camel.util.CamelContextHelper;
 import rx.Observable;
@@ -62,7 +61,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
      */
     public Observable<Message> toObservable(Endpoint endpoint) {
-        return createEndpointObservable(endpoint, 
ExchangeToMessageFunc1.getInstance());
+        return toObservable(endpoint, Message.class);
     }
 
     /**
@@ -93,6 +92,20 @@ public class ReactiveCamel {
     }
 
     /**
+     * Convenience method for beginning the route
+     */
+    public Observable<Exchange> from(Endpoint endpoint) {
+        return createEndpointObservable(endpoint);
+    }
+
+    /**
+     * Convenience method for beginning the route
+     */
+    public Observable<Exchange> from(String uri) {
+        return from(endpoint(uri));
+    }
+
+    /**
      * Convenience method for creating CamelOperator instances
      */
     public CamelOperator to(String uri) throws Exception {
@@ -124,4 +137,15 @@ public class ReactiveCamel {
         return new EndpointObservable<T>(endpoint, func);
     }
 
+    /**
+     * Return a newly created {@link Observable} without conversion
+     */
+    protected Observable<Exchange> createEndpointObservable(final Endpoint 
endpoint) {
+        return new EndpointObservable<Exchange>(endpoint, new 
EndpointSubscribeFunc<>(endpoint, new Func1<Exchange, Exchange>() {
+            @Override
+            public Exchange call(Exchange exchange) {
+                return exchange;
+            }
+        }));
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java 
b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
index f0bacc3..8d09858 100644
--- 
a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
+++ 
b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -18,15 +18,18 @@ package org.apache.camel.rx;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.Message;
+import org.apache.camel.Exchange;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 import rx.Subscription;
+import rx.functions.Func1;
 import rx.observables.ConnectableObservable;
 
+import static com.jayway.restassured.RestAssured.*;
+import static org.hamcrest.Matchers.*;
+
 /**
  */
 public class CamelOperatorTest extends RxTestSupport {
@@ -37,30 +40,64 @@ public class CamelOperatorTest extends RxTestSupport {
         final MockEndpoint mockEndpoint1 = 
camelContext.getEndpoint("mock:results1", MockEndpoint.class);
         final MockEndpoint mockEndpoint2 = 
camelContext.getEndpoint("mock:results2", MockEndpoint.class);
         final MockEndpoint mockEndpoint3 = 
camelContext.getEndpoint("mock:results3", MockEndpoint.class);
+        final MockEndpoint mockEndpoint4 = 
camelContext.getEndpoint("mock:results4", MockEndpoint.class);
         mockEndpoint1.expectedMessageCount(2);
         mockEndpoint2.expectedMessageCount(1);
         mockEndpoint3.expectedMessageCount(1);
+        mockEndpoint4.expectedMessageCount(2);
 
-        ConnectableObservable<Message> route = 
reactiveCamel.toObservable("direct:start")
+        // Define an InOnly route
+        ConnectableObservable<Exchange> inOnly = 
reactiveCamel.from("direct:start")
             .lift(new CamelOperator(mockEndpoint1))
-            .lift(new CamelOperator(camelContext, "log:foo"))
+            .lift(new CamelOperator(camelContext, "log:inOnly"))
             .debounce(1, TimeUnit.SECONDS)
             .lift(reactiveCamel.to(mockEndpoint2))
             .lift(reactiveCamel.to("mock:results3"))
             .publish();
 
         // Start the route
-        Subscription routeSubscription = route.connect();
+        Subscription inSubscription = inOnly.connect();
 
         // Send two test messages
-        producerTemplate.sendBody("direct:start", "<test/>");
-        producerTemplate.sendBody("direct:start", "<test/>");
+        producerTemplate.sendBody("direct:start", "<test1/>");
+        producerTemplate.sendBody("direct:start", "<test2/>");
+
+        // Define an InOut route
+        ConnectableObservable<Exchange> inOut = 
reactiveCamel.from("restlet:http://localhost:9080/test?restletMethod=POST";)
+            .map(new Func1<Exchange, Exchange>() { // Convert body to String
+                @Override
+                public Exchange call(Exchange exchange) {
+                    
exchange.getIn().setBody(exchange.getIn().getBody(String.class));
+                    return exchange;
+                }
+            })
+            .lift(reactiveCamel.to("log:inOut"))
+            .map(new Func1<Exchange, Exchange>() { // Change body for response
+                @Override
+                public Exchange call(Exchange exchange) {
+                    
exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " back");
+                    return exchange;
+                }
+            })
+            .lift(reactiveCamel.to(mockEndpoint4))
+            .publish();
+
+        // Start the route
+        Subscription inoutSubscription = inOut.connect();
+
+        // Send two messages and check the responses
+        
given().body("hello").when().post("http://localhost:9080/test";).then().assertThat().body(containsString("hello
 back"));
+        
given().body("holla").when().post("http://localhost:9080/test";).then().assertThat().body(containsString("holla
 back"));
 
         mockEndpoint1.assertIsSatisfied();
         mockEndpoint2.assertIsSatisfied();
         mockEndpoint3.assertIsSatisfied();
+        mockEndpoint4.assertIsSatisfied();
+
+        // Stop the route
+        inSubscription.unsubscribe();
 
         // Stop the route
-        routeSubscription.unsubscribe();
+        inoutSubscription.unsubscribe();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/resources/log4j.properties 
b/components/camel-rx/src/test/resources/log4j.properties
index 747baca..4fbe6ae 100644
--- a/components/camel-rx/src/test/resources/log4j.properties
+++ b/components/camel-rx/src/test/resources/log4j.properties
@@ -32,5 +32,5 @@ log4j.appender.out.layout.ConversionPattern=[%30.30t] 
%-30.30c{1} %-5p %m%n
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
-log4j.appender.file.file=target/camel-tx-test.log
+log4j.appender.file.file=target/camel-rx-test.log
 log4j.appender.file.append=true

Reply via email to