This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new bc4755c  CAMEL-13734: camel-undertow - Fix one-way streaming for 
producer with tests
bc4755c is described below

commit bc4755c25aae841d66519525d68a57f39f441472
Author: Tadayoshi Sato <sato.tadayo...@gmail.com>
AuthorDate: Thu Jul 18 13:41:33 2019 +0900

    CAMEL-13734: camel-undertow - Fix one-way streaming for producer with tests
    
    (cherry picked from commit 5190b410c85a8169ee6a7eb24db8df4ac050c979)
---
 .../component/undertow/UndertowClientCallback.java |  18 ++--
 .../camel/component/undertow/UndertowProducer.java |  13 ++-
 .../undertow/UndertowStreamingClientCallback.java  |  26 ++++-
 .../undertow/UndertowHttpStreamingTest.java        | 119 +++++++++++++++++++++
 4 files changed, 163 insertions(+), 13 deletions(-)

diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
index 21a4171..4eac8e7 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
@@ -100,15 +100,15 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
 
     protected final ClientRequest request;
 
-    private final ByteBuffer body;
-
-    private final AsyncCallback callback;
+    protected final AsyncCallback callback;
 
     /**
      * A queue of resources that will be closed when the exchange ends, add 
more
      * resources via {@link #deferClose(Closeable)}.
      */
-    private final BlockingDeque<Closeable> closables = new 
LinkedBlockingDeque<>();
+    protected final BlockingDeque<Closeable> closables = new 
LinkedBlockingDeque<>();
+
+    private final ByteBuffer body;
 
     private final Boolean throwExceptionOnFailure;
 
@@ -161,9 +161,13 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
         }
     }
 
-    void finish(final Message result) {
-        for (final Closeable closeable : closables) {
-            IoUtils.safeClose(closeable);
+    protected void finish(final Message result) {
+        finish(result, true);
+    }
+
+    protected void finish(final Message result, boolean close) {
+        if (close) {
+            closables.forEach(IoUtils::safeClose);
         }
 
         if (result != null) {
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
index 279a350..2a7a5bd 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
@@ -141,7 +141,8 @@ public class UndertowProducer extends DefaultAsyncProducer {
 
         final Object body = undertowHttpBinding.toHttpRequest(request, 
camelExchange.getIn());
         final UndertowClientCallback clientCallback;
-        if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) {
+        final boolean streaming = getEndpoint().isUseStreaming();
+        if (streaming && (body instanceof InputStream)) {
             // For streaming, make it chunked encoding instead of specifying 
content length
             requestHeaders.put(Headers.TRANSFER_ENCODING, "chunked");
             clientCallback = new 
UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
@@ -156,8 +157,14 @@ public class UndertowProducer extends DefaultAsyncProducer 
{
                 requestHeaders.put(Headers.CONTENT_LENGTH, 
bodyAsByte.remaining());
             }
 
-            clientCallback = new UndertowClientCallback(camelExchange, 
callback, getEndpoint(),
-                    request, bodyAsByte);
+            if (streaming) {
+                // response may receive streaming
+                clientCallback = new 
UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
+                        request, bodyAsByte);
+            } else {
+                clientCallback = new UndertowClientCallback(camelExchange, 
callback, getEndpoint(),
+                        request, bodyAsByte);
+            }
         }
 
         if (log.isDebugEnabled()) {
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
index b1e2e34..af87ef7 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
@@ -28,7 +28,10 @@ import io.undertow.client.ClientExchange;
 import io.undertow.client.ClientRequest;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
+import org.xnio.IoUtils;
 import org.xnio.channels.StreamSinkChannel;
 
 class UndertowStreamingClientCallback extends UndertowClientCallback {
@@ -37,19 +40,36 @@ class UndertowStreamingClientCallback extends 
UndertowClientCallback {
 
     UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback,
                                     UndertowEndpoint endpoint, ClientRequest 
request,
+                                    ByteBuffer body) {
+        super(exchange, callback, endpoint, request, body);
+        this.bodyStream = null;
+    }
+
+    UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback,
+                                    UndertowEndpoint endpoint, ClientRequest 
request,
                                     InputStream body) {
         super(exchange, callback, endpoint, request, null);
         this.bodyStream = body;
     }
 
     @Override
-    public void completed(ClientConnection connection) {
-        // no connection closing registered as streaming continues downstream
-        connection.sendRequest(request, on(this::performClientExchange));
+    protected void finish(Message result) {
+        boolean close = true;
+        if (result != null && result.getBody() instanceof InputStream) {
+            // no connection closing as streaming continues downstream
+            close = false;
+        }
+        finish(result, close);
     }
 
     @Override
     protected void writeRequest(ClientExchange clientExchange) {
+        if (bodyStream == null) {
+            super.writeRequest(clientExchange);
+            return;
+        }
+
+        // send request stream
         StreamSinkChannel requestChannel = clientExchange.getRequestChannel();
         try (ReadableByteChannel source = Channels.newChannel(bodyStream)) {
             IOHelper.transfer(source, requestChannel);
diff --git 
a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java
 
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java
new file mode 100644
index 0000000..2c1d938
--- /dev/null
+++ 
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.undertow;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.junit.Test;
+
+public class UndertowHttpStreamingTest extends BaseUndertowTest {
+
+    private static final String LINE =
+            String.join("", Collections.nCopies(100, "0123456789"));
+    private static final long COUNT = 1000; // approx. 1MB
+
+    @Test
+    public void testTwoWayStreaming() throws Exception {
+        long expectedLength = LINE.length() * COUNT;
+        MockEndpoint mock = getMockEndpoint("mock:length");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedLength);
+
+        Exchange response = template.send(
+                "undertow:http://localhost:{{port}}?useStreaming=true";,
+                e -> produceStream(e));
+        consumeStream(response);
+        long length = response.getIn().getBody(Long.class).longValue();
+
+        mock.assertIsSatisfied();
+        assertEquals(expectedLength, length);
+    }
+
+    @Test
+    public void testOneWayStreaming() throws Exception {
+        long expectedLength = LINE.length() * COUNT;
+        MockEndpoint mock = getMockEndpoint("mock:length");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(12);
+
+        Exchange response = template.send(
+                "undertow:http://localhost:{{port}}?useStreaming=true";,
+                e -> { e.getIn().setBody("Hello Camel!"); });
+        consumeStream(response);
+        long length = response.getIn().getBody(Long.class).longValue();
+
+        mock.assertIsSatisfied();
+        assertEquals(expectedLength, length);
+    }
+
+    private static void produceStream(Exchange exchange) throws IOException {
+        PipedOutputStream out = new PipedOutputStream();
+        exchange.getIn().setBody(new PipedInputStream(out));
+        new Thread(() -> {
+            try (OutputStreamWriter osw = new OutputStreamWriter(out);
+                 BufferedWriter writer = new BufferedWriter(osw)) {
+                LongStream.range(0, COUNT).forEach(i -> {
+                    try {
+                        writer.write(LINE);
+                        writer.newLine();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }).start();
+    }
+
+    private static void consumeStream(Exchange exchange) throws IOException {
+        try (InputStream in = exchange.getIn().getBody(InputStream.class);
+             BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+            long length = reader.lines()
+                    .collect(Collectors.summingLong(String::length));
+            exchange.getIn().setBody(length);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("undertow:http://localhost:{{port}}?useStreaming=true";)
+                        .process(e -> consumeStream(e))
+                        .to("mock:length")
+                        .process(e -> produceStream(e));
+            }
+        };
+    }
+
+}

Reply via email to