Author: jstrachan
Date: Mon Mar  4 07:06:12 2013
New Revision: 1452197

URL: http://svn.apache.org/r1452197
Log:
added an overloaded version of toObservable() that takes the body type as an 
argument so that the body of the message can be extracted and processed as an 
Observable directly for cases where the user knows the type of the payload in a 
message

Added:
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
      - copied, changed from r1452192, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java   
(with props)
Modified:
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java

Modified: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1452197&r1=1452196&r2=1452197&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
 Mon Mar  4 07:06:12 2013
@@ -49,6 +49,17 @@ public class ReactiveCamel {
     }
 
     /**
+     * Returns an {@link rx.Observable <T>} for the messages with their 
payload converted to the given type
+     * to allow the messages sent on the endpoint
+     * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
+     */
+    public <T> Observable<T> toObservable(String uri, final Class<T> bodyType) 
{
+        return toObservable(camelContext.getEndpoint(uri), bodyType);
+    }
+
+
+
+    /**
      * Returns an {@link rx.Observable < org.apache.camel.Message >} to allow 
the messages sent on the endpoint
      * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
      */
@@ -61,6 +72,21 @@ public class ReactiveCamel {
         });
     }
 
+    /**
+     * Returns an {@link rx.Observable <T>} for the messages with their 
payload converted to the given type
+     * to allow the messages sent on the endpoint
+     * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
+     */
+    public <T> Observable<T> toObservable(Endpoint endpoint, final Class<T> 
bodyType) {
+        return createEndpointObservable(endpoint, new Func1<Exchange, T>() {
+            @Override
+            public T call(Exchange exchange) {
+                Message in = exchange.getIn();
+                return in.getBody(bodyType);
+            }
+        });
+    }
+
     public CamelContext getCamelContext() {
         return camelContext;
     }

Copied: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
 (from r1452192, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java&r1=1452192&r2=1452197&rev=1452197&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
 Mon Mar  4 07:06:12 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.camel.rx;
 
-import org.apache.camel.Message;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -25,27 +24,48 @@ import org.slf4j.LoggerFactory;
 
 import rx.Observable;
 import rx.util.functions.Action1;
+import rx.util.functions.Func1;
 
 /**
  */
-public class ObservableMessageTest extends RxTestSupport {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(ObservableMessageTest.class);
+public class ObservableBodyTest extends RxTestSupport {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(ObservableBodyTest.class);
 
     @Test
     public void testConsume() throws Exception {
         final MockEndpoint mockEndpoint = 
camelContext.getEndpoint("mock:results", MockEndpoint.class);
-        mockEndpoint.expectedMessageCount(4);
+        mockEndpoint.expectedBodiesReceived("b", "d");
 
-        Observable<Message> observable = 
reactiveCamel.toObservable("timer://foo?fixedRate=true&period=100");
-        observable.take(4).subscribe(new Action1<Message>() {
+        // lets consume, filter and map events
+        Observable<Order> observable = 
reactiveCamel.toObservable("seda:orders", Order.class);
+        Observable<String> largeOrderIds = observable.filter(new Func1<Order, 
Boolean>() {
+            public Boolean call(Order order) {
+                return order.getAmount() > 100.0;
+            }
+        }).map(new Func1<Order, String>() {
+            public String call(Order order) {
+                return order.getId();
+            }
+        });
+
+
+        // lets route the largeOrderIds to the mock endpoint for testing
+        largeOrderIds.take(2).subscribe(new Action1<String>() {
             @Override
-            public void call(Message message) {
-                String body = "Processing message headers " + 
message.getHeaders();
-                LOG.info(body);
+            public void call(String body) {
+                LOG.info("Processing  " + body);
                 producerTemplate.sendBody(mockEndpoint, body);
             }
         });
 
+
+        // now lets send some orders in
+        Order[] orders = {new Order("a", 49.95), new Order("b", 125.50), new 
Order("c", 22.95),
+                new Order("d", 259.95), new Order("e", 1.25)};
+        for (Order order : orders) {
+            producerTemplate.sendBody("seda:orders", order);
+        }
+
         mockEndpoint.assertIsSatisfied();
     }
 }

Added: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java?rev=1452197&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java 
(added)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java 
Mon Mar  4 07:06:12 2013
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+/**
+ */
+public class Order {
+    private String id;
+    private double amount;
+
+    public Order(String id, double amount) {
+        this.amount = amount;
+        this.id = id;
+    }
+
+    public String toString() {
+        return "Order[id " + id + ", amount " + amount + "]";
+    }
+
+    public double getAmount() {
+        return amount;
+    }
+
+    public String getId() {
+        return id;
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java?rev=1452197&r1=1452196&r2=1452197&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java
 Mon Mar  4 07:06:12 2013
@@ -36,10 +36,12 @@ public abstract class RxTestSupport {
         reactiveCamel = new ReactiveCamel(camelContext);
         producerTemplate = camelContext.createProducerTemplate();
         camelContext.start();
+        producerTemplate.start();
     }
 
     @After
     public void destroy() throws Exception {
+        producerTemplate.stop();
         camelContext.stop();
     }
 }


Reply via email to