RXPage edited by James StrachanChanges (1)
Full ContentCamel RXThe camel-rx library provides Camel support for the Reactive Extensions (RX) using the RxJava library so that:
Background on RXYou can think of RX as providing an API similar to Java 8 / Groovy / Scala collections (methods like filter, map, zip etc) - but which operates on an asynchronous stream of events rather than a collection. So you could think of RX as like working with asynchronous push based collections (rather than the traditional synchronous pull based collections). In RX you work with an Observable<T> which behaves quite like a Collection<T> in Java 8 so you can filter/map/concat and so forth. The Observable<T> then acts as a typesafe composable API for working with asynchronous events in a collection-like way. Observing events on Camel endpointsYou can create an Observable<Message> from any endpoint using the ReactiveCamel helper class and the toObservable() method.
import org.apache.camel.rx.*; ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Message> observable = rx.toObservable("activemq:MyMessages"); // we can now call filter/map/concat etc filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody()); If you know the type of the body of the message, you can use an overloaded version of toObservable() to pass in the class and get a typesafe Observable<T> back: import org.apache.camel.rx.*; ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Order> observable = rx.toObservable("seda:orders", Order.class); // now lets filter and map using Java 7 Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() { public Boolean call(Order order) { return order.getAmount() > 100.0; } }).map(new Func1<Order, String>() { public String call(Order order) { return order.getId(); } }); Sending Observable<T> events to Camel endpointsIf you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the sendTo() method on ReactiveCamel: import org.apache.camel.rx.*; // take some observable from somewhere Observable<T> observable = ...; ReactiveCamel rx = new ReactiveCamel(camelContext); // lets send the events to a message queue rx.sendTo(observable, "activemq:MyQueue");
Change Notification Preferences
View Online
|
View Changes
|
Add Comment
|
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence
- [CONF] Apache Camel > RX confluence