CAMEL-11125: adding low watermark for refilling exchanges (test)

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/812b98db
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/812b98db
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/812b98db

Branch: refs/heads/master
Commit: 812b98db83845ef73635e82fc990e5a7f830dcc2
Parents: 5a951a4
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Tue May 9 13:12:44 2017 +0200
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Tue May 9 16:17:03 2017 +0200

----------------------------------------------------------------------
 .../streams/engine/CamelSubscriber.java         |  2 +-
 .../reactive/streams/RequestRefillTest.java     | 71 +++++++++++++++-----
 2 files changed, 54 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/812b98db/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index cf6f374..6df386f 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -137,7 +137,7 @@ public class CamelSubscriber implements 
Subscriber<Exchange>, Closeable {
                 long max = (consMax != null && consMax > 0) ? 
consMax.longValue() : UNBOUNDED_REQUESTS;
                 if (requested < UNBOUNDED_REQUESTS) {
                     long lowWatermark = Math.max(0, 
Math.round(consumer.getEndpoint().getExchangesRefillLowWatermark() * max));
-                    long minRequests = Math.max(max, max - lowWatermark);
+                    long minRequests = Math.min(max, max - lowWatermark);
                     long newRequest = max - requested - inflightCount;
                     if (newRequest > 0 && newRequest >= minRequests) {
                         toBeRequested = newRequest;

http://git-wip-us.apache.org/repos/asf/camel/blob/812b98db/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
index bba814f..c95970e 100644
--- 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
+++ 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java
@@ -39,44 +39,57 @@ public class RequestRefillTest extends CamelTestSupport {
 
     @Test
     public void testUnboundedRequests() throws Exception {
+        int numReqs = 100;
+        List<Long> requests = executeTest("unbounded", numReqs);
+        assertEquals(1, requests.size());
+        assertEquals(Long.MAX_VALUE, requests.get(0).longValue());
+    }
 
-        final int numReqs = 100;
-
-        List<Long> requests = Collections.synchronizedList(new LinkedList<>());
-        Publisher<Long> nums = createPublisher(numReqs, requests);
-
-        MockEndpoint mock = getMockEndpoint("mock:unbounded-endpoint");
-        mock.expectedMessageCount(numReqs);
-
-        CamelReactiveStreamsService rxCamel = 
CamelReactiveStreams.get(context());
-        nums.subscribe(rxCamel.streamSubscriber("unbounded", Long.class));
-
-        mock.assertIsSatisfied();
+    @Test
+    public void testUnboundedRequestsWatermarkNoEffect() throws Exception {
+        int numReqs = 100;
+        List<Long> requests = executeTest("unbounded-100", numReqs);
         assertEquals(1, requests.size());
         assertEquals(Long.MAX_VALUE, requests.get(0).longValue());
-        Long sum = mock.getExchanges().stream().map(x -> 
x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get();
-        assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue());
     }
 
     @Test
     public void testBoundedRequests() throws Exception {
+        int numReqs = 100;
+        List<Long> requests = executeTest("bounded", numReqs);
+        assertTrue(requests.size() >= numReqs / 10);
+    }
 
-        final int numReqs = 100;
+    @Test
+    public void testBoundedRequestsPercentageRefill() throws Exception {
+        int numReqs = 120;
+        List<Long> requests0 = executeTest("bounded-0", numReqs);
+        List<Long> requests10 = executeTest("bounded-10", numReqs);
+        List<Long> requests25 = executeTest("bounded", numReqs);
+        List<Long> requests80 = executeTest("bounded-80", numReqs);
+        List<Long> requests100 = executeTest("bounded-100", numReqs);
+
+        assertTrue(requests0.size() <= requests10.size()); // too close
+        assertTrue(requests10.size() < requests25.size());
+        assertTrue(requests25.size() < requests80.size());
+        assertTrue(requests80.size() < requests100.size());
+    }
 
+    private List<Long> executeTest(String name, int numReqs) throws 
InterruptedException {
         List<Long> requests = Collections.synchronizedList(new LinkedList<>());
         Publisher<Long> nums = createPublisher(numReqs, requests);
 
-        MockEndpoint mock = getMockEndpoint("mock:bounded-endpoint");
+        MockEndpoint mock = getMockEndpoint("mock:" + name + "-endpoint");
         mock.expectedMessageCount(numReqs);
 
         CamelReactiveStreamsService rxCamel = 
CamelReactiveStreams.get(context());
-        nums.subscribe(rxCamel.streamSubscriber("bounded", Long.class));
+        nums.subscribe(rxCamel.streamSubscriber(name, Long.class));
 
         mock.assertIsSatisfied();
 
-        assertTrue(requests.size() >= numReqs / 10);
         Long sum = mock.getExchanges().stream().map(x -> 
x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get();
         assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue());
+        return requests;
     }
 
     private Publisher<Long> createPublisher(final int numReqs, final 
List<Long> requests) {
@@ -89,10 +102,32 @@ public class RequestRefillTest extends CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 from("reactive-streams:unbounded?maxInflightExchanges=-1")
+                        .delayer(1)
                         .to("mock:unbounded-endpoint");
 
+                
from("reactive-streams:unbounded-100?maxInflightExchanges=-1&exchangesRefillLowWatermark=1")
+                        .delayer(1)
+                        .to("mock:unbounded-100-endpoint");
+
                 from("reactive-streams:bounded?maxInflightExchanges=10")
+                        .delayer(1)
                         .to("mock:bounded-endpoint");
+
+                
from("reactive-streams:bounded-0?maxInflightExchanges=10&exchangesRefillLowWatermark=0")
+                        .delayer(1)
+                        .to("mock:bounded-0-endpoint");
+
+                
from("reactive-streams:bounded-10?maxInflightExchanges=10&exchangesRefillLowWatermark=0.1")
+                        .delayer(1)
+                        .to("mock:bounded-10-endpoint");
+
+                
from("reactive-streams:bounded-80?maxInflightExchanges=10&exchangesRefillLowWatermark=0.8")
+                        .delayer(1)
+                        .to("mock:bounded-80-endpoint");
+
+                
from("reactive-streams:bounded-100?maxInflightExchanges=10&exchangesRefillLowWatermark=1")
+                        .delayer(1)
+                        .to("mock:bounded-100-endpoint");
             }
         };
     }

Reply via email to