CAMEL-11125: adding low watermark for refilling exchanges (test)
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/812b98db Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/812b98db Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/812b98db Branch: refs/heads/master Commit: 812b98db83845ef73635e82fc990e5a7f830dcc2 Parents: 5a951a4 Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Tue May 9 13:12:44 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue May 9 16:17:03 2017 +0200 ---------------------------------------------------------------------- .../streams/engine/CamelSubscriber.java | 2 +- .../reactive/streams/RequestRefillTest.java | 71 +++++++++++++++----- 2 files changed, 54 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/812b98db/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java index cf6f374..6df386f 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java @@ -137,7 +137,7 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable { long max = (consMax != null && consMax > 0) ? consMax.longValue() : UNBOUNDED_REQUESTS; if (requested < UNBOUNDED_REQUESTS) { long lowWatermark = Math.max(0, Math.round(consumer.getEndpoint().getExchangesRefillLowWatermark() * max)); - long minRequests = Math.max(max, max - lowWatermark); + long minRequests = Math.min(max, max - lowWatermark); long newRequest = max - requested - inflightCount; if (newRequest > 0 && newRequest >= minRequests) { toBeRequested = newRequest; http://git-wip-us.apache.org/repos/asf/camel/blob/812b98db/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java index bba814f..c95970e 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java @@ -39,44 +39,57 @@ public class RequestRefillTest extends CamelTestSupport { @Test public void testUnboundedRequests() throws Exception { + int numReqs = 100; + List<Long> requests = executeTest("unbounded", numReqs); + assertEquals(1, requests.size()); + assertEquals(Long.MAX_VALUE, requests.get(0).longValue()); + } - final int numReqs = 100; - - List<Long> requests = Collections.synchronizedList(new LinkedList<>()); - Publisher<Long> nums = createPublisher(numReqs, requests); - - MockEndpoint mock = getMockEndpoint("mock:unbounded-endpoint"); - mock.expectedMessageCount(numReqs); - - CamelReactiveStreamsService rxCamel = CamelReactiveStreams.get(context()); - nums.subscribe(rxCamel.streamSubscriber("unbounded", Long.class)); - - mock.assertIsSatisfied(); + @Test + public void testUnboundedRequestsWatermarkNoEffect() throws Exception { + int numReqs = 100; + List<Long> requests = executeTest("unbounded-100", numReqs); assertEquals(1, requests.size()); assertEquals(Long.MAX_VALUE, requests.get(0).longValue()); - Long sum = mock.getExchanges().stream().map(x -> x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get(); - assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue()); } @Test public void testBoundedRequests() throws Exception { + int numReqs = 100; + List<Long> requests = executeTest("bounded", numReqs); + assertTrue(requests.size() >= numReqs / 10); + } - final int numReqs = 100; + @Test + public void testBoundedRequestsPercentageRefill() throws Exception { + int numReqs = 120; + List<Long> requests0 = executeTest("bounded-0", numReqs); + List<Long> requests10 = executeTest("bounded-10", numReqs); + List<Long> requests25 = executeTest("bounded", numReqs); + List<Long> requests80 = executeTest("bounded-80", numReqs); + List<Long> requests100 = executeTest("bounded-100", numReqs); + + assertTrue(requests0.size() <= requests10.size()); // too close + assertTrue(requests10.size() < requests25.size()); + assertTrue(requests25.size() < requests80.size()); + assertTrue(requests80.size() < requests100.size()); + } + private List<Long> executeTest(String name, int numReqs) throws InterruptedException { List<Long> requests = Collections.synchronizedList(new LinkedList<>()); Publisher<Long> nums = createPublisher(numReqs, requests); - MockEndpoint mock = getMockEndpoint("mock:bounded-endpoint"); + MockEndpoint mock = getMockEndpoint("mock:" + name + "-endpoint"); mock.expectedMessageCount(numReqs); CamelReactiveStreamsService rxCamel = CamelReactiveStreams.get(context()); - nums.subscribe(rxCamel.streamSubscriber("bounded", Long.class)); + nums.subscribe(rxCamel.streamSubscriber(name, Long.class)); mock.assertIsSatisfied(); - assertTrue(requests.size() >= numReqs / 10); Long sum = mock.getExchanges().stream().map(x -> x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get(); assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue()); + return requests; } private Publisher<Long> createPublisher(final int numReqs, final List<Long> requests) { @@ -89,10 +102,32 @@ public class RequestRefillTest extends CamelTestSupport { @Override public void configure() throws Exception { from("reactive-streams:unbounded?maxInflightExchanges=-1") + .delayer(1) .to("mock:unbounded-endpoint"); + from("reactive-streams:unbounded-100?maxInflightExchanges=-1&exchangesRefillLowWatermark=1") + .delayer(1) + .to("mock:unbounded-100-endpoint"); + from("reactive-streams:bounded?maxInflightExchanges=10") + .delayer(1) .to("mock:bounded-endpoint"); + + from("reactive-streams:bounded-0?maxInflightExchanges=10&exchangesRefillLowWatermark=0") + .delayer(1) + .to("mock:bounded-0-endpoint"); + + from("reactive-streams:bounded-10?maxInflightExchanges=10&exchangesRefillLowWatermark=0.1") + .delayer(1) + .to("mock:bounded-10-endpoint"); + + from("reactive-streams:bounded-80?maxInflightExchanges=10&exchangesRefillLowWatermark=0.8") + .delayer(1) + .to("mock:bounded-80-endpoint"); + + from("reactive-streams:bounded-100?maxInflightExchanges=10&exchangesRefillLowWatermark=1") + .delayer(1) + .to("mock:bounded-100-endpoint"); } }; }