Repository: camel Updated Branches: refs/heads/master fe4af678b -> e53b17117
CAMEL-11125: changing test impl Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e53b1711 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e53b1711 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e53b1711 Branch: refs/heads/master Commit: e53b17117de3b730dcc0a2761f89902b965968b6 Parents: fe4af67 Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Tue May 9 16:36:31 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue May 9 16:36:31 2017 +0200 ---------------------------------------------------------------------- .../reactive/streams/RequestRefillTest.java | 7 +++---- .../reactive/streams/support/TestPublisher.java | 16 ---------------- 2 files changed, 3 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e53b1711/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java index c95970e..ea7d450 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/RequestRefillTest.java @@ -19,19 +19,18 @@ package org.apache.camel.component.reactive.streams; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; -import org.apache.camel.component.reactive.streams.support.TestPublisher; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + /** * Test the number of refill requests that are sent to a published from a Camel consumer. */ @@ -93,7 +92,7 @@ public class RequestRefillTest extends CamelTestSupport { } private Publisher<Long> createPublisher(final int numReqs, final List<Long> requests) { - return new TestPublisher<>(LongStream.rangeClosed(1, numReqs).boxed().collect(Collectors.toList()), 0, requests::add); + return Flux.range(1, numReqs).map(Long::valueOf).doOnRequest(requests::add); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/e53b1711/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java index c3351cc..5c317ac 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/TestPublisher.java @@ -33,20 +33,13 @@ public class TestPublisher<T> implements Publisher<T> { private long delay; - private RequestObserver requestObserver; - public TestPublisher(Iterable<T> data) { this(data, 0L); } public TestPublisher(Iterable<T> data, long delay) { - this(data, delay, null); - } - - public TestPublisher(Iterable<T> data, long delay, RequestObserver requestObserver) { this.data = data; this.delay = delay; - this.requestObserver = requestObserver; } @Override @@ -61,10 +54,6 @@ public class TestPublisher<T> implements Publisher<T> { @Override public void request(long l) { - if (requestObserver != null) { - requestObserver.observe(l); - } - this.requested.addAndGet(l); new Thread() { @@ -119,9 +108,4 @@ public class TestPublisher<T> implements Publisher<T> { }); } - public interface RequestObserver { - - void observe(long request); - - } }