Updated Branches: refs/heads/master 28d3be75e -> 5d06437e5
Let's make use of a clean java generics by the camel-rx component codebase, consequently removed all the @SuppressWarnings occurrences and polished it's codebase as well. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d06437e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d06437e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d06437e Branch: refs/heads/master Commit: 5d06437e518fd1ab93ced8ae2ebf9bcbc5be0a0b Parents: 28d3be7 Author: Babak Vahdat <bvah...@apache.org> Authored: Mon Oct 28 22:18:09 2013 +0100 Committer: Babak Vahdat <bvah...@apache.org> Committed: Mon Oct 28 22:18:09 2013 +0100 ---------------------------------------------------------------------- .../java/org/apache/camel/impl/DefaultDebugger.java | 2 +- .../main/java/org/apache/camel/rx/ObservableBody.java | 3 ++- .../main/java/org/apache/camel/rx/ReactiveCamel.java | 12 +++++------- .../apache/camel/rx/support/EndpointSubscription.java | 6 +++--- .../apache/camel/rx/support/ObservableProcessor.java | 11 ++++++----- .../org/apache/camel/rx/support/ObserverSender.java | 7 +++++-- .../apache/camel/rx/support/ProcessorToObserver.java | 4 ++-- .../java/org/apache/camel/rx/ObservableBodyTest.java | 4 ++++ .../java/org/apache/camel/rx/ObservableMessageTest.java | 4 ++++ .../src/test/java/org/apache/camel/rx/Order.java | 5 +++-- .../src/test/java/org/apache/camel/rx/SendToTest.java | 2 +- .../org/apache/camel/rx/ToObservableAndMapTest.java | 1 + .../java/org/apache/camel/rx/ToObservableBodyTest.java | 2 ++ 13 files changed, 39 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java index f487fb5..84e91dd 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultDebugger.java @@ -122,7 +122,7 @@ public class DefaultDebugger implements Debugger, CamelContextAware { } public void addSingleStepBreakpoint(final Breakpoint breakpoint) { - addSingleStepBreakpoint(breakpoint, null); + addSingleStepBreakpoint(breakpoint); } public void addSingleStepBreakpoint(final Breakpoint breakpoint, Condition... conditions) { http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java index 0dc05af..737d255 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java @@ -28,10 +28,11 @@ public abstract class ObservableBody<T> extends ObservableProcessor<T> { private final Class<T> bodyType; public ObservableBody(Class<T> bodyType) { - super(new ExchangeToBodyFunc1(bodyType)); + super(new ExchangeToBodyFunc1<T>(bodyType)); this.bodyType = bodyType; } + @Override public String toString() { return "ObservableBody[" + bodyType.getName() + "]"; } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java index 769c39e..5180bb4 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java @@ -86,10 +86,9 @@ public class ReactiveCamel { /** * Sends events on the given {@link Observable} to the given camel endpoint */ - @SuppressWarnings("unchecked") public <T> void sendTo(Observable<T> observable, Endpoint endpoint) { try { - ObserverSender observer = new ObserverSender(endpoint); + ObserverSender<T> observer = new ObserverSender<T>(endpoint); observable.subscribe(observer); } catch (Exception e) { throw new RuntimeCamelRxException(e); @@ -108,16 +107,15 @@ public class ReactiveCamel { * Returns a newly created {@link Observable} given a function which converts * the {@link Exchange} from the Camel consumer to the required type */ - @SuppressWarnings("unchecked") protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint, final Func1<Exchange, T> converter) { - Observable.OnSubscribeFunc<Message> func = new Observable.OnSubscribeFunc<Message>() { + Observable.OnSubscribeFunc<T> func = new Observable.OnSubscribeFunc<T>() { @Override - public Subscription onSubscribe(Observer<? super Message> observer) { - return new EndpointSubscription(endpoint, observer, converter); + public Subscription onSubscribe(Observer<? super T> observer) { + return new EndpointSubscription<T>(endpoint, observer, converter); } }; - return new EndpointObservable(endpoint, func); + return new EndpointObservable<T>(endpoint, func); } } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java index 1684838..349a898 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java @@ -31,10 +31,10 @@ import rx.util.functions.Func1; */ public class EndpointSubscription<T> implements Subscription { private final Endpoint endpoint; - private final Observer<T> observer; + private final Observer<? super T> observer; private Consumer consumer; - public EndpointSubscription(Endpoint endpoint, final Observer<T> observer, + public EndpointSubscription(Endpoint endpoint, final Observer<? super T> observer, final Func1<Exchange, T> func) { this.endpoint = endpoint; this.observer = observer; @@ -72,7 +72,7 @@ public class EndpointSubscription<T> implements Subscription { return endpoint; } - public Observer<T> getObserver() { + public Observer<? super T> getObserver() { return observer; } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java index 056f591..cf03583 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java @@ -30,14 +30,14 @@ import rx.util.functions.Func1; * so that the messages can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a> */ public abstract class ObservableProcessor<T> extends ServiceSupport implements Processor { - private final Subject observable = PublishSubject.create(); - private final ProcessorToObserver processor; + private final Subject<T, T> observable = PublishSubject.create(); + private final ProcessorToObserver<T> processor; - @SuppressWarnings("unchecked") protected ObservableProcessor(Func1<Exchange, T> func) { - this.processor = new ProcessorToObserver(func, observable); + this.processor = new ProcessorToObserver<T>(func, observable); } + @Override public void process(Exchange exchange) throws Exception { processor.process(exchange); } @@ -46,7 +46,6 @@ public abstract class ObservableProcessor<T> extends ServiceSupport implements P * Returns the {@link Observable} for this {@link Processor} so that the messages that are received * can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a> */ - @SuppressWarnings("unchecked") public Observable<T> getObservable() { return observable; } @@ -57,10 +56,12 @@ public abstract class ObservableProcessor<T> extends ServiceSupport implements P */ protected abstract void configure(Observable<T> observable); + @Override protected void doStart() throws Exception { configure(getObservable()); } + @Override protected void doStop() throws Exception { // noop } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java index 435ddd8..35f1048 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java @@ -27,7 +27,7 @@ import rx.Observer; /** * An {@link Observer} which sends events to a given {@link Endpoint} */ -public class ObserverSender implements Observer { +public class ObserverSender<T> implements Observer<T> { private Producer producer; public ObserverSender(Endpoint endpoint) throws Exception { @@ -35,6 +35,7 @@ public class ObserverSender implements Observer { ServiceHelper.startService(producer); } + @Override public void onCompleted() { if (producer != null) { try { @@ -47,13 +48,15 @@ public class ObserverSender implements Observer { } } + @Override public void onError(Throwable e) { Exchange exchange = producer.createExchange(); exchange.setException(e); send(exchange); } - public void onNext(Object o) { + @Override + public void onNext(T o) { Exchange exchange = producer.createExchange(); exchange.getIn().setBody(o); send(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java index 57d0c36..fff8668 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java @@ -29,9 +29,9 @@ import rx.util.functions.Func1; */ public class ProcessorToObserver<T> implements Processor { private final Func1<Exchange, T> func; - private final Observer<T> observer; + private final Observer<? super T> observer; - public ProcessorToObserver(Func1<Exchange, T> func, Observer<T> observer) { + public ProcessorToObserver(Func1<Exchange, T> func, Observer<? super T> observer) { this.func = func; this.observer = observer; } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java index 3fca044..5b2cf41 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java @@ -52,13 +52,16 @@ public class ObservableBodyTest extends CamelTestSupport { super(String.class); } + @Override protected void configure(Observable<String> observable) { // lets process the messages using the RX API observable.map(new Func1<String, String>() { + @Override public String call(String body) { return "Hello " + body; } }).subscribe(new Action1<String>() { + @Override public void call(String body) { template.sendBody(resultEndpoint, body); } @@ -69,6 +72,7 @@ public class ObservableBodyTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { + @Override public void configure() { from("direct:start").process(observableBody); } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java index b9b3851..d952797 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java @@ -49,13 +49,16 @@ public class ObservableMessageTest extends CamelTestSupport { } public class MyObservableMessage extends ObservableMessage { + @Override protected void configure(Observable<Message> observable) { // lets process the messages using the RX API observable.map(new Func1<Message, String>() { + @Override public String call(Message message) { return "Hello " + message.getBody(String.class); } }).subscribe(new Action1<String>() { + @Override public void call(String body) { template.sendBody(resultEndpoint, body); } @@ -66,6 +69,7 @@ public class ObservableMessageTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { + @Override public void configure() { from("direct:start").process(observableMessage); } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java b/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java index 8f866ea..5d98e20 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java @@ -19,14 +19,15 @@ package org.apache.camel.rx; /** */ public class Order { - private String id; - private double amount; + private final String id; + private final double amount; public Order(String id, double amount) { this.amount = amount; this.id = id; } + @Override public String toString() { return "Order[id " + id + ", amount " + amount + "]"; } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java index 83de696..634c7a9 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java @@ -30,7 +30,7 @@ public class SendToTest extends RxTestSupport { Observable<Order> someObservable = Observable.from(expectedBodies); final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class); - mockEndpoint.expectedBodiesReceived(expectedBodies); + mockEndpoint.expectedBodiesReceived((Object[]) expectedBodies); // lets send events on the observable to the camel endpoint reactiveCamel.sendTo(someObservable, "mock:results"); http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java index e173bcd..b159918 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java @@ -41,6 +41,7 @@ public class ToObservableAndMapTest extends RxTestSupport { // transform the stream Observable<String> observable = observableMessage.map(new Func1<Message, String>() { + @Override public String call(Message message) { return "Transformed value: headers " + message.getHeaders(); } http://git-wip-us.apache.org/repos/asf/camel/blob/5d06437e/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java index f2449c3..6974e8b 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java @@ -38,10 +38,12 @@ public class ToObservableBodyTest extends RxTestSupport { // lets consume, filter and map events Observable<Order> observable = reactiveCamel.toObservable("seda:orders", Order.class); Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() { + @Override public Boolean call(Order order) { return order.getAmount() > 100.0; } }).map(new Func1<Order, String>() { + @Override public String call(Order order) { return order.getId(); }