Repository: camel Updated Branches: refs/heads/master 575033f24 -> 5e9ac8871
CAMEL-11125: handle unbounded streams without refilling Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5e9ac887 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5e9ac887 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5e9ac887 Branch: refs/heads/master Commit: 5e9ac88717145311c92e1e5b65950761b0d90e97 Parents: 575033f Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Mon May 8 17:55:55 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Mon May 8 17:55:55 2017 +0200 ---------------------------------------------------------------------- .../streams/engine/CamelSubscriber.java | 23 +++-- .../reactive/streams/RequestRefillTest.java | 99 ++++++++++++++++++++ .../reactive/streams/support/TestPublisher.java | 17 ++++ 3 files changed, 130 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5e9ac887/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java index dba42f0..a2cb232 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java @@ -35,9 +35,9 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable { private static final Logger LOG = LoggerFactory.getLogger(CamelSubscriber.class); /** - * Enough to be considered unbounded. Requests are refilled once completed. + * Unbounded as per rule #17. No need to refill. */ - private static final long MAX_INFLIGHT_UNBOUNDED = Long.MAX_VALUE / 2; + private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE; private ReactiveStreamsConsumer consumer; @@ -104,7 +104,10 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable { ReactiveStreamsConsumer target; synchronized (this) { - requested--; + if (requested < UNBOUNDED_REQUESTS) { + // When there are UNBOUNDED_REQUESTS, they remain constant + requested--; + } target = this.consumer; if (target != null) { inflightCount++; @@ -131,12 +134,14 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable { synchronized (this) { if (consumer != null && this.subscription != null) { Integer consMax = consumer.getEndpoint().getMaxInflightExchanges(); - long max = (consMax != null && consMax > 0) ? consMax.longValue() : MAX_INFLIGHT_UNBOUNDED; - long newRequest = max - requested - inflightCount; - if (newRequest > 0) { - toBeRequested = newRequest; - requested += toBeRequested; - subs = this.subscription; + long max = (consMax != null && consMax > 0) ? consMax.longValue() : UNBOUNDED_REQUESTS; + if (requested < UNBOUNDED_REQUESTS) { + long newRequest = max - requested - inflightCount; + if (newRequest > 0) { + toBeRequested = newRequest; + requested += toBeRequested; + subs = this.subscription; + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/5e9ac887/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java new file mode 100644 index 0000000..bba814f --- /dev/null +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.component.reactive.streams.support.TestPublisher; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import org.reactivestreams.Publisher; + +/** + * Test the number of refill requests that are sent to a published from a Camel consumer. + */ +public class RequestRefillTest extends CamelTestSupport { + + @Test + public void testUnboundedRequests() throws Exception { + + final int numReqs = 100; + + List<Long> requests = Collections.synchronizedList(new LinkedList<>()); + Publisher<Long> nums = createPublisher(numReqs, requests); + + MockEndpoint mock = getMockEndpoint("mock:unbounded-endpoint"); + mock.expectedMessageCount(numReqs); + + CamelReactiveStreamsService rxCamel = CamelReactiveStreams.get(context()); + nums.subscribe(rxCamel.streamSubscriber("unbounded", Long.class)); + + mock.assertIsSatisfied(); + assertEquals(1, requests.size()); + assertEquals(Long.MAX_VALUE, requests.get(0).longValue()); + Long sum = mock.getExchanges().stream().map(x -> x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get(); + assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue()); + } + + @Test + public void testBoundedRequests() throws Exception { + + final int numReqs = 100; + + List<Long> requests = Collections.synchronizedList(new LinkedList<>()); + Publisher<Long> nums = createPublisher(numReqs, requests); + + MockEndpoint mock = getMockEndpoint("mock:bounded-endpoint"); + mock.expectedMessageCount(numReqs); + + CamelReactiveStreamsService rxCamel = CamelReactiveStreams.get(context()); + nums.subscribe(rxCamel.streamSubscriber("bounded", Long.class)); + + mock.assertIsSatisfied(); + + assertTrue(requests.size() >= numReqs / 10); + Long sum = mock.getExchanges().stream().map(x -> x.getIn().getBody(Long.class)).reduce((l, r) -> l + r).get(); + assertEquals(numReqs * (numReqs + 1) / 2, sum.longValue()); + } + + private Publisher<Long> createPublisher(final int numReqs, final List<Long> requests) { + return new TestPublisher<>(LongStream.rangeClosed(1, numReqs).boxed().collect(Collectors.toList()), 0, requests::add); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("reactive-streams:unbounded?maxInflightExchanges=-1") + .to("mock:unbounded-endpoint"); + + from("reactive-streams:bounded?maxInflightExchanges=10") + .to("mock:bounded-endpoint"); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5e9ac887/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java index 0165bed..c3351cc 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java @@ -33,13 +33,20 @@ public class TestPublisher<T> implements Publisher<T> { private long delay; + private RequestObserver requestObserver; + public TestPublisher(Iterable<T> data) { this(data, 0L); } public TestPublisher(Iterable<T> data, long delay) { + this(data, delay, null); + } + + public TestPublisher(Iterable<T> data, long delay, RequestObserver requestObserver) { this.data = data; this.delay = delay; + this.requestObserver = requestObserver; } @Override @@ -54,6 +61,10 @@ public class TestPublisher<T> implements Publisher<T> { @Override public void request(long l) { + if (requestObserver != null) { + requestObserver.observe(l); + } + this.requested.addAndGet(l); new Thread() { @@ -107,4 +118,10 @@ public class TestPublisher<T> implements Publisher<T> { } }); } + + public interface RequestObserver { + + void observe(long request); + + } }