Repository: camel
Updated Branches:
  refs/heads/master 575033f24 -> 5e9ac8871


CAMEL-11125: handle unbounded streams without refilling


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5e9ac887
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5e9ac887
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5e9ac887

Branch: refs/heads/master
Commit: 5e9ac88717145311c92e1e5b65950761b0d90e97
Parents: 575033f
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Mon May 8 17:55:55 2017 +0200
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Mon May 8 17:55:55 2017 +0200

----------------------------------------------------------------------
 .../streams/engine/CamelSubscriber.java         | 23 +++--
 .../reactive/streams/RequestRefillTest.java     | 99 ++++++++++++++++++++
 .../reactive/streams/support/TestPublisher.java | 17 ++++
 3 files changed, 130 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5e9ac887/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index dba42f0..a2cb232 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -35,9 +35,9 @@ public class CamelSubscriber implements Subscriber<Exchange>, 
Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSubscriber.class);
 
     /**
-     * Enough to be considered unbounded. Requests are refilled once completed.
+     * Unbounded as per rule #17. No need to refill.
      */
-    private static final long MAX_INFLIGHT_UNBOUNDED = Long.MAX_VALUE / 2;
+    private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE;
 
     private ReactiveStreamsConsumer consumer;
 
@@ -104,7 +104,10 @@ public class CamelSubscriber implements 
Subscriber<Exchange>, Closeable {
 
         ReactiveStreamsConsumer target;
         synchronized (this) {
-            requested--;
+            if (requested < UNBOUNDED_REQUESTS) {
+                // When there are UNBOUNDED_REQUESTS, they remain constant
+                requested--;
+            }
             target = this.consumer;
             if (target != null) {
                 inflightCount++;
@@ -131,12 +134,14 @@ public class CamelSubscriber implements 
Subscriber<Exchange>, Closeable {
         synchronized (this) {
             if (consumer != null && this.subscription != null) {
                 Integer consMax = 
consumer.getEndpoint().getMaxInflightExchanges();
-                long max = (consMax != null && consMax > 0) ? 
consMax.longValue() : MAX_INFLIGHT_UNBOUNDED;
-                long newRequest = max - requested - inflightCount;
-                if (newRequest > 0) {
-                    toBeRequested = newRequest;
-                    requested += toBeRequested;
-                    subs = this.subscription;
+                long max = (consMax != null && consMax > 0) ? 
consMax.longValue() : UNBOUNDED_REQUESTS;
+                if (requested < UNBOUNDED_REQUESTS) {
+                    long newRequest = max - requested - inflightCount;
+                    if (newRequest > 0) {
+                        toBeRequested = newRequest;
+                        requested += toBeRequested;
+                        subs = this.subscription;
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/5e9ac887/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
new file mode 100644
index 0000000..bba814f
--- /dev/null
+++ 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import 
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.support.TestPublisher;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+/**
+ * Test the number of refill requests that are sent to a published from a 
Camel consumer.
+ */
+public class RequestRefillTest extends CamelTestSupport {
+
+    @Test
+    public void testUnboundedRequests() throws Exception {
+
+        final int numReqs = 100;
+
+        List<Long> requests = Collections.synchronizedList(new LinkedList<>());
+        Publisher<Long> nums = createPublisher(numReqs, requests);
+
+        MockEndpoint mock = getMockEndpoint("mock:unbounded-endpoint");
+        mock.expectedMessageCount(numReqs);
+
+        CamelReactiveStreamsService rxCamel = 
CamelReactiveStreams.get(context());
+        nums.subscribe(rxCamel.streamSubscriber("unbounded", Long.class));
+
+        mock.assertIsSatisfied();
+        assertEquals(1, requests.size());
+        assertEquals(Long.MAX_VALUE, requests.get(0).longValue());
+        Long sum = mock.getExchanges().stream().map(x -> 
x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get();
+        assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue());
+    }
+
+    @Test
+    public void testBoundedRequests() throws Exception {
+
+        final int numReqs = 100;
+
+        List<Long> requests = Collections.synchronizedList(new LinkedList<>());
+        Publisher<Long> nums = createPublisher(numReqs, requests);
+
+        MockEndpoint mock = getMockEndpoint("mock:bounded-endpoint");
+        mock.expectedMessageCount(numReqs);
+
+        CamelReactiveStreamsService rxCamel = 
CamelReactiveStreams.get(context());
+        nums.subscribe(rxCamel.streamSubscriber("bounded", Long.class));
+
+        mock.assertIsSatisfied();
+
+        assertTrue(requests.size() >= numReqs / 10);
+        Long sum = mock.getExchanges().stream().map(x -> 
x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get();
+        assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue());
+    }
+
+    private Publisher<Long> createPublisher(final int numReqs, final 
List<Long> requests) {
+        return new TestPublisher<>(LongStream.rangeClosed(1, 
numReqs).boxed().collect(Collectors.toList()), 0, requests::add);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("reactive-streams:unbounded?maxInflightExchanges=-1")
+                        .to("mock:unbounded-endpoint");
+
+                from("reactive-streams:bounded?maxInflightExchanges=10")
+                        .to("mock:bounded-endpoint");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5e9ac887/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java
 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java
index 0165bed..c3351cc 100644
--- 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java
+++ 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java
@@ -33,13 +33,20 @@ public class TestPublisher<T> implements Publisher<T> {
 
     private long delay;
 
+    private RequestObserver requestObserver;
+
     public TestPublisher(Iterable<T> data) {
         this(data, 0L);
     }
 
     public TestPublisher(Iterable<T> data, long delay) {
+        this(data, delay, null);
+    }
+
+    public TestPublisher(Iterable<T> data, long delay, RequestObserver 
requestObserver) {
         this.data = data;
         this.delay = delay;
+        this.requestObserver = requestObserver;
     }
 
     @Override
@@ -54,6 +61,10 @@ public class TestPublisher<T> implements Publisher<T> {
 
             @Override
             public void request(long l) {
+                if (requestObserver != null) {
+                    requestObserver.observe(l);
+                }
+
                 this.requested.addAndGet(l);
 
                 new Thread() {
@@ -107,4 +118,10 @@ public class TestPublisher<T> implements Publisher<T> {
             }
         });
     }
+
+    public interface RequestObserver {
+
+        void observe(long request);
+
+    }
 }

Reply via email to