Repository: camel Updated Branches: refs/heads/master 9b8397f08 -> 9020df3be
CAMEL-7833 InOnly and InOut routes as Observable<Exchange> sequences Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33ebe714 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33ebe714 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33ebe714 Branch: refs/heads/master Commit: 33ebe714a9b304632eaeed2bdd0859d583b41563 Parents: 9b8397f Author: Jyrki Ruuskanen <yur...@kotikone.fi> Authored: Sun Apr 5 12:59:12 2015 +0300 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Mon Apr 6 15:38:47 2015 +0800 ---------------------------------------------------------------------- components/camel-rx/pom.xml | 9 ++++ .../java/org/apache/camel/rx/CamelOperator.java | 24 +++------ .../java/org/apache/camel/rx/ReactiveCamel.java | 28 ++++++++++- .../org/apache/camel/rx/CamelOperatorTest.java | 53 +++++++++++++++++--- .../src/test/resources/log4j.properties | 2 +- 5 files changed, 87 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-rx/pom.xml b/components/camel-rx/pom.xml index cb4827f..63baf5f 100644 --- a/components/camel-rx/pom.xml +++ b/components/camel-rx/pom.xml @@ -39,6 +39,10 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-restlet</artifactId> + </dependency> <dependency> <groupId>io.reactivex</groupId> @@ -62,6 +66,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.jayway.restassured</groupId> + <artifactId>rest-assured</artifactId> + <version>2.3.0</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java index 2a6fa3a..917f069 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java @@ -19,14 +19,13 @@ package org.apache.camel.rx; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; +import org.apache.camel.processor.PipelineHelper; import org.apache.camel.util.ServiceHelper; import rx.Observable; import rx.Subscriber; -public class CamelOperator implements Observable.Operator<Message, Message> { +public class CamelOperator implements Observable.Operator<Exchange, Exchange> { private ProducerTemplate producerTemplate; private Endpoint endpoint; @@ -44,8 +43,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> { } @Override - public Subscriber<? super Message> call(final Subscriber<? super Message> s) { - return new Subscriber<Message>(s) { + public Subscriber<? super Exchange> call(final Subscriber<? super Exchange> s) { + return new Subscriber<Exchange>(s) { @Override public void onCompleted() { try { @@ -70,19 +69,14 @@ public class CamelOperator implements Observable.Operator<Message, Message> { } @Override - public void onNext(Message item) { + public void onNext(Exchange item) { if (!s.isUnsubscribed()) { Exchange exchange = process(item); if (exchange.getException() != null) { s.onError(exchange.getException()); } else { - if (exchange.hasOut()) { - s.onNext(exchange.getOut()); - } else { - s.onNext(exchange.getIn()); - } + s.onNext(PipelineHelper.createNextExchange(exchange)); } - } } }; @@ -96,10 +90,4 @@ public class CamelOperator implements Observable.Operator<Message, Message> { } return exchange; } - - private Exchange process(Message message) { - Exchange exchange = endpoint.createExchange(); - exchange.setIn(message); - return process(exchange); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java index 678c4e8..e0eb869 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java @@ -23,7 +23,6 @@ import org.apache.camel.Message; import org.apache.camel.rx.support.EndpointObservable; import org.apache.camel.rx.support.EndpointSubscribeFunc; import org.apache.camel.rx.support.ExchangeToBodyFunc1; -import org.apache.camel.rx.support.ExchangeToMessageFunc1; import org.apache.camel.rx.support.ObserverSender; import org.apache.camel.util.CamelContextHelper; import rx.Observable; @@ -62,7 +61,7 @@ public class ReactiveCamel { * to be processed using <a href="https://rx.codeplex.com/">Reactive Extensions</a> */ public Observable<Message> toObservable(Endpoint endpoint) { - return createEndpointObservable(endpoint, ExchangeToMessageFunc1.getInstance()); + return toObservable(endpoint, Message.class); } /** @@ -93,6 +92,20 @@ public class ReactiveCamel { } /** + * Convenience method for beginning the route + */ + public Observable<Exchange> from(Endpoint endpoint) { + return createEndpointObservable(endpoint); + } + + /** + * Convenience method for beginning the route + */ + public Observable<Exchange> from(String uri) { + return from(endpoint(uri)); + } + + /** * Convenience method for creating CamelOperator instances */ public CamelOperator to(String uri) throws Exception { @@ -124,4 +137,15 @@ public class ReactiveCamel { return new EndpointObservable<T>(endpoint, func); } + /** + * Return a newly created {@link Observable} without conversion + */ + protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) { + return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<>(endpoint, new Func1<Exchange, Exchange>() { + @Override + public Exchange call(Exchange exchange) { + return exchange; + } + })); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java index f0bacc3..8d09858 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java @@ -18,15 +18,18 @@ package org.apache.camel.rx; import java.util.concurrent.TimeUnit; -import org.apache.camel.Message; +import org.apache.camel.Exchange; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Observable; import rx.Subscription; +import rx.functions.Func1; import rx.observables.ConnectableObservable; +import static com.jayway.restassured.RestAssured.*; +import static org.hamcrest.Matchers.*; + /** */ public class CamelOperatorTest extends RxTestSupport { @@ -37,30 +40,64 @@ public class CamelOperatorTest extends RxTestSupport { final MockEndpoint mockEndpoint1 = camelContext.getEndpoint("mock:results1", MockEndpoint.class); final MockEndpoint mockEndpoint2 = camelContext.getEndpoint("mock:results2", MockEndpoint.class); final MockEndpoint mockEndpoint3 = camelContext.getEndpoint("mock:results3", MockEndpoint.class); + final MockEndpoint mockEndpoint4 = camelContext.getEndpoint("mock:results4", MockEndpoint.class); mockEndpoint1.expectedMessageCount(2); mockEndpoint2.expectedMessageCount(1); mockEndpoint3.expectedMessageCount(1); + mockEndpoint4.expectedMessageCount(2); - ConnectableObservable<Message> route = reactiveCamel.toObservable("direct:start") + // Define an InOnly route + ConnectableObservable<Exchange> inOnly = reactiveCamel.from("direct:start") .lift(new CamelOperator(mockEndpoint1)) - .lift(new CamelOperator(camelContext, "log:foo")) + .lift(new CamelOperator(camelContext, "log:inOnly")) .debounce(1, TimeUnit.SECONDS) .lift(reactiveCamel.to(mockEndpoint2)) .lift(reactiveCamel.to("mock:results3")) .publish(); // Start the route - Subscription routeSubscription = route.connect(); + Subscription inSubscription = inOnly.connect(); // Send two test messages - producerTemplate.sendBody("direct:start", "<test/>"); - producerTemplate.sendBody("direct:start", "<test/>"); + producerTemplate.sendBody("direct:start", "<test1/>"); + producerTemplate.sendBody("direct:start", "<test2/>"); + + // Define an InOut route + ConnectableObservable<Exchange> inOut = reactiveCamel.from("restlet:http://localhost:9080/test?restletMethod=POST") + .map(new Func1<Exchange, Exchange>() { // Convert body to String + @Override + public Exchange call(Exchange exchange) { + exchange.getIn().setBody(exchange.getIn().getBody(String.class)); + return exchange; + } + }) + .lift(reactiveCamel.to("log:inOut")) + .map(new Func1<Exchange, Exchange>() { // Change body for response + @Override + public Exchange call(Exchange exchange) { + exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " back"); + return exchange; + } + }) + .lift(reactiveCamel.to(mockEndpoint4)) + .publish(); + + // Start the route + Subscription inoutSubscription = inOut.connect(); + + // Send two messages and check the responses + given().body("hello").when().post("http://localhost:9080/test").then().assertThat().body(containsString("hello back")); + given().body("holla").when().post("http://localhost:9080/test").then().assertThat().body(containsString("holla back")); mockEndpoint1.assertIsSatisfied(); mockEndpoint2.assertIsSatisfied(); mockEndpoint3.assertIsSatisfied(); + mockEndpoint4.assertIsSatisfied(); + + // Stop the route + inSubscription.unsubscribe(); // Stop the route - routeSubscription.unsubscribe(); + inoutSubscription.unsubscribe(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/33ebe714/components/camel-rx/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/resources/log4j.properties b/components/camel-rx/src/test/resources/log4j.properties index 747baca..4fbe6ae 100644 --- a/components/camel-rx/src/test/resources/log4j.properties +++ b/components/camel-rx/src/test/resources/log4j.properties @@ -32,5 +32,5 @@ log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n -log4j.appender.file.file=target/camel-tx-test.log +log4j.appender.file.file=target/camel-rx-test.log log4j.appender.file.append=true