CAMEL-10612: fix delayed publisher issue on late request
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8195f3ed Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8195f3ed Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8195f3ed Branch: refs/heads/master Commit: 8195f3ed512d48684482b90900b449a1f40f56ce Parents: dd25dcc Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Feb 3 17:14:11 2017 +0100 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Fri Feb 3 18:41:41 2017 +0100 ---------------------------------------------------------------------- .../streams/engine/DelayedMonoPublisher.java | 8 ++++-- .../streams/DelayedMonoPublisherTest.java | 28 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8195f3ed/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java index ef8ece1..106ab4f 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java @@ -138,8 +138,11 @@ public class DelayedMonoPublisher<T> implements Publisher<T> { @Override public void request(long l) { - if (terminated) { - throw new IllegalStateException("The subscription is terminated"); + synchronized (this) { + if (terminated) { + // just ignore the request + return; + } } if (l <= 0) { @@ -153,6 +156,7 @@ public class DelayedMonoPublisher<T> implements Publisher<T> { } } + flushCycle(); } public void flush() { http://git-wip-us.apache.org/repos/asf/camel/blob/8195f3ed/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java index 011225e..9361473 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java @@ -17,15 +17,18 @@ package org.apache.camel.component.reactive.streams; import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import io.reactivex.Flowable; import org.apache.camel.component.reactive.streams.engine.DelayedMonoPublisher; +import org.apache.camel.component.reactive.streams.support.TestSubscriber; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -224,6 +227,31 @@ public class DelayedMonoPublisherTest { } } + @Test + public void testDelayedRequest() throws Exception { + + DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service); + pub.setData(2); + + BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(); + + TestSubscriber<Integer> sub = new TestSubscriber<Integer>() { + @Override + public void onNext(Integer o) { + queue.add(o); + } + }; + sub.setInitiallyRequested(0); + + pub.subscribe(sub); + + Thread.sleep(100); + sub.request(1); + + Integer res = queue.poll(1, TimeUnit.SECONDS); + assertEquals(new Integer(2), res); + } + @Test(expected = IllegalStateException.class) public void testDataOrExceptionAllowed() throws Exception { DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);