Repository: camel
Updated Branches:
  refs/heads/master 8d72cb64f -> 3ec500cb4


Upgraded RX java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3ec500cb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3ec500cb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3ec500cb

Branch: refs/heads/master
Commit: 3ec500cb431e134b716a754e30668dc9346b0aa8
Parents: 8d72cb6
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Mar 12 08:18:14 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Mar 12 08:18:14 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/ReactiveCamel.java | 12 +----
 .../camel/rx/support/EndpointObservable.java    |  3 +-
 .../camel/rx/support/EndpointSubscribeFunc.java | 46 ++++++++++++++++++++
 .../camel/rx/support/EndpointSubscription.java  | 25 +++++++----
 parent/pom.xml                                  |  2 +-
 5 files changed, 67 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3ec500cb/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java 
b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 5180bb4..b104ebf 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -21,15 +21,12 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.rx.support.EndpointObservable;
-import org.apache.camel.rx.support.EndpointSubscription;
+import org.apache.camel.rx.support.EndpointSubscribeFunc;
 import org.apache.camel.rx.support.ExchangeToBodyFunc1;
 import org.apache.camel.rx.support.ExchangeToMessageFunc1;
 import org.apache.camel.rx.support.ObserverSender;
 import org.apache.camel.util.CamelContextHelper;
-
 import rx.Observable;
-import rx.Observer;
-import rx.Subscription;
 import rx.util.functions.Func1;
 
 /**
@@ -109,12 +106,7 @@ public class ReactiveCamel {
      */
     protected <T> Observable<T> createEndpointObservable(final Endpoint 
endpoint,
                                                          final Func1<Exchange, 
T> converter) {
-        Observable.OnSubscribeFunc<T> func = new 
Observable.OnSubscribeFunc<T>() {
-            @Override
-            public Subscription onSubscribe(Observer<? super T> observer) {
-                return new EndpointSubscription<T>(endpoint, observer, 
converter);
-            }
-        };
+        Observable.OnSubscribe<T> func = new 
EndpointSubscribeFunc<T>(endpoint, converter);
         return new EndpointObservable<T>(endpoint, func);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3ec500cb/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointObservable.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointObservable.java
 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointObservable.java
index da74d09..c0010b2 100644
--- 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointObservable.java
+++ 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointObservable.java
@@ -17,7 +17,6 @@
 package org.apache.camel.rx.support;
 
 import org.apache.camel.Endpoint;
-
 import rx.Observable;
 
 /**
@@ -26,7 +25,7 @@ import rx.Observable;
 public class EndpointObservable<T> extends Observable<T> {
     private final Endpoint endpoint;
 
-    public EndpointObservable(Endpoint endpoint, OnSubscribeFunc<T> func) {
+    public EndpointObservable(Endpoint endpoint, final OnSubscribe<T> func) {
         super(func);
         this.endpoint = endpoint;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3ec500cb/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
new file mode 100644
index 0000000..7e6cf17
--- /dev/null
+++ 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscribeFunc.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import rx.Observable;
+import rx.Observer;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.util.functions.Func1;
+
+public class EndpointSubscribeFunc<T> implements 
Observable.OnSubscribeFunc<T>, Observable.OnSubscribe<T> {
+
+    private final Endpoint endpoint;
+    private final Func1<Exchange, T> converter;
+
+    public EndpointSubscribeFunc(Endpoint endpoint, Func1<Exchange, T> 
converter) {
+        this.endpoint = endpoint;
+        this.converter = converter;
+    }
+
+    @Override
+    public Subscription onSubscribe(Observer<? super T> observer) {
+        return new EndpointSubscription<T>(endpoint, observer, converter);
+    }
+
+    @Override
+    public void call(Subscriber<? super T> subscriber) {
+        onSubscribe(subscriber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3ec500cb/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 349a898..8769561 100644
--- 
a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ 
b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.rx.support;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -33,6 +35,7 @@ public class EndpointSubscription<T> implements Subscription {
     private final Endpoint endpoint;
     private final Observer<? super T> observer;
     private Consumer consumer;
+    private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
 
     public EndpointSubscription(Endpoint endpoint, final Observer<? super T> 
observer,
                                 final Func1<Exchange, T> func) {
@@ -56,18 +59,24 @@ public class EndpointSubscription<T> implements 
Subscription {
 
     @Override
     public void unsubscribe() {
-        if (consumer != null) {
-            try {
-                ServiceHelper.stopServices(consumer);
-
-                // TODO should this fire the observer.onComplete()?
-                observer.onCompleted();
-            } catch (Exception e) {
-                observer.onError(e);
+        if (unsubscribed.compareAndSet(false, true)) {
+            if (consumer != null) {
+                try {
+                    ServiceHelper.stopServices(consumer);
+                    // TODO should this fire the observer.onComplete()?
+                    observer.onCompleted();
+                } catch (Exception e) {
+                    observer.onError(e);
+                }
             }
         }
     }
 
+    @Override
+    public boolean isUnsubscribed() {
+        return unsubscribed.get();
+    }
+
     public Endpoint getEndpoint() {
         return endpoint;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3ec500cb/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index dbabd85..149bbd3 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -346,7 +346,7 @@
     <rhino-version>1.7R2</rhino-version>
     <rome-bundle-version>1.0_3</rome-bundle-version>
     <rome-version>1.0</rome-version>
-    <rxjava-version>0.16.1</rxjava-version>
+    <rxjava-version>0.17.0</rxjava-version>
     <saaj-impl-version>1.3.2_2</saaj-impl-version>
     <saxon-bundle-version>9.5.1-4_1</saxon-bundle-version>
     <saxon-version>9.5.1-4</saxon-version>

Reply via email to