CAMEL-10883: show that payloads with multiple delays are not read
also ensure that the main thread is not spinning on read=0


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc42a59b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc42a59b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc42a59b

Branch: refs/heads/master
Commit: dc42a59b3eeefc304e7262ece0eb564c71a2f2a4
Parents: c2ffb10
Author: rohan <rohan.h...@fronde.com>
Authored: Tue Feb 21 12:45:31 2017 +1300
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Thu Feb 23 10:00:24 2017 +0100

----------------------------------------------------------------------
 .../DefaultUndertowHttpBindingTest.java         | 92 +++++++++++++-------
 1 file changed, 62 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc42a59b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java
 
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java
index 9bd7cc7..797196f 100644
--- 
a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java
+++ 
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/DefaultUndertowHttpBindingTest.java
@@ -10,56 +10,88 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 public class DefaultUndertowHttpBindingTest {
 
-    @Test
+    @Test(timeout = 1000)
     public void readEntireDelayedPayload() throws Exception {
-        byte[] delayedPayload = "first ".getBytes();
+        String[] delayedPayloads = new String[] {
+                "chunk",
+        };
 
-        ExecutorService executor = Executors.newFixedThreadPool(1);
-        StreamSourceChannel source = new EmptyStreamSourceChannel(
-                new XnioIoThread(null, 0) {
-                    @Override
-                    public void execute(Runnable runnable) {
-                        executor.execute(runnable);
-                    }
+        StreamSourceChannel source = source(delayedPayloads);
 
-                    @Override
-                    public Key executeAfter(Runnable runnable, long l, 
TimeUnit timeUnit) {
-                        execute(runnable);
-                        return null;
-                    }
+        DefaultUndertowHttpBinding binding = new DefaultUndertowHttpBinding();
+        String result = new String(binding.readFromChannel(source));
 
-                    @Override
-                    public Key executeAtInterval(Runnable runnable, long l, 
TimeUnit timeUnit) {
-                        execute(runnable);
-                        return null;
-                    }
-                }) {
+        assertThat(result, is(delayedPayloads[0]));
+    }
+
+    @Test(timeout = 1000)
+    public void readEntireMultiDelayedPayload() throws Exception {
+        String[] delayedPayloads = new String[] {
+                "first ",
+                "second",
+        };
+
+        StreamSourceChannel source = source(delayedPayloads);
+
+        DefaultUndertowHttpBinding binding = new DefaultUndertowHttpBinding();
+        String result = new String(binding.readFromChannel(source));
+
+        assertThat(result, is(
+                Stream.of(delayedPayloads)
+                        .collect(Collectors.joining())));
+    }
+
+    private StreamSourceChannel source(final String[] delayedPayloads) {
+        XnioIoThread thread = thread();
+        Thread sourceThread = Thread.currentThread();
+
+        return new EmptyStreamSourceChannel(thread) {
             int chunk = 0;
 
             @Override
             public int read(ByteBuffer dst) throws IOException {
-                switch (chunk) {
-                    case 0:
-                        chunk++;
-                        return 0;
-                    case 1:
+                // can only read payloads in the reader thread
+                if (sourceThread != Thread.currentThread()) {
+                    if (chunk < delayedPayloads.length) {
+                        byte[] delayedPayload = 
delayedPayloads[chunk].getBytes();
                         dst.put(delayedPayload);
                         chunk++;
-                        return 6;
+                        return delayedPayload.length;
+                    }
+                    return -1;
                 }
-                return -1;
+                return 0;
             }
         };
+    }
 
-        DefaultUndertowHttpBinding binding = new DefaultUndertowHttpBinding();
-        byte[] result = binding.readFromChannel(source);
+    private XnioIoThread thread() {
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        return new XnioIoThread(null, 0) {
+            @Override
+            public void execute(Runnable runnable) {
+                executor.execute(runnable);
+            }
+
+            @Override
+            public Key executeAfter(Runnable runnable, long l, TimeUnit 
timeUnit) {
+                execute(runnable);
+                return null;
+            }
 
-        assertThat(result, is(delayedPayload));
+            @Override
+            public Key executeAtInterval(Runnable runnable, long l, TimeUnit 
timeUnit) {
+                execute(runnable);
+                return null;
+            }
+        };
     }
 }
\ No newline at end of file

Reply via email to