CAMEL-10883: show that payloads with multiple delays are not read also ensure that the main thread is not spinning on read=0
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc42a59b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc42a59b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc42a59b Branch: refs/heads/master Commit: dc42a59b3eeefc304e7262ece0eb564c71a2f2a4 Parents: c2ffb10 Author: rohan <rohan.h...@fronde.com> Authored: Tue Feb 21 12:45:31 2017 +1300 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Thu Feb 23 10:00:24 2017 +0100 ---------------------------------------------------------------------- .../DefaultUndertowHttpBindingTest.java | 92 +++++++++++++------- 1 file changed, 62 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dc42a59b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java index 9bd7cc7..797196f 100644 --- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java @@ -10,56 +10,88 @@ import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; public class DefaultUndertowHttpBindingTest { - @Test + @Test(timeout = 1000) public void readEntireDelayedPayload() throws Exception { - byte[] delayedPayload = "first ".getBytes(); + String[] delayedPayloads = new String[] { + "chunk", + }; - ExecutorService executor = Executors.newFixedThreadPool(1); - StreamSourceChannel source = new EmptyStreamSourceChannel( - new XnioIoThread(null, 0) { - @Override - public void execute(Runnable runnable) { - executor.execute(runnable); - } + StreamSourceChannel source = source(delayedPayloads); - @Override - public Key executeAfter(Runnable runnable, long l, TimeUnit timeUnit) { - execute(runnable); - return null; - } + DefaultUndertowHttpBinding binding = new DefaultUndertowHttpBinding(); + String result = new String(binding.readFromChannel(source)); - @Override - public Key executeAtInterval(Runnable runnable, long l, TimeUnit timeUnit) { - execute(runnable); - return null; - } - }) { + assertThat(result, is(delayedPayloads[0])); + } + + @Test(timeout = 1000) + public void readEntireMultiDelayedPayload() throws Exception { + String[] delayedPayloads = new String[] { + "first ", + "second", + }; + + StreamSourceChannel source = source(delayedPayloads); + + DefaultUndertowHttpBinding binding = new DefaultUndertowHttpBinding(); + String result = new String(binding.readFromChannel(source)); + + assertThat(result, is( + Stream.of(delayedPayloads) + .collect(Collectors.joining()))); + } + + private StreamSourceChannel source(final String[] delayedPayloads) { + XnioIoThread thread = thread(); + Thread sourceThread = Thread.currentThread(); + + return new EmptyStreamSourceChannel(thread) { int chunk = 0; @Override public int read(ByteBuffer dst) throws IOException { - switch (chunk) { - case 0: - chunk++; - return 0; - case 1: + // can only read payloads in the reader thread + if (sourceThread != Thread.currentThread()) { + if (chunk < delayedPayloads.length) { + byte[] delayedPayload = delayedPayloads[chunk].getBytes(); dst.put(delayedPayload); chunk++; - return 6; + return delayedPayload.length; + } + return -1; } - return -1; + return 0; } }; + } - DefaultUndertowHttpBinding binding = new DefaultUndertowHttpBinding(); - byte[] result = binding.readFromChannel(source); + private XnioIoThread thread() { + ExecutorService executor = Executors.newFixedThreadPool(1); + return new XnioIoThread(null, 0) { + @Override + public void execute(Runnable runnable) { + executor.execute(runnable); + } + + @Override + public Key executeAfter(Runnable runnable, long l, TimeUnit timeUnit) { + execute(runnable); + return null; + } - assertThat(result, is(delayedPayload)); + @Override + public Key executeAtInterval(Runnable runnable, long l, TimeUnit timeUnit) { + execute(runnable); + return null; + } + }; } } \ No newline at end of file