This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 72e18efd15af749bde12fe71f923d94889c01fca
Author: Tadayoshi Sato <sato.tadayo...@gmail.com>
AuthorDate: Mon Jul 8 14:43:45 2019 +0900

    CAMEL-13734: camel-undertow - Support streaming for consumer endpoints
---
 .../src/main/docs/undertow-component.adoc          |  2 +-
 .../undertow/DefaultUndertowHttpBinding.java       | 14 ++++++++-
 .../undertow/RestUndertowHttpBinding.java          |  8 ++++++
 .../component/undertow/UndertowComponent.java      |  2 +-
 .../camel/component/undertow/UndertowConsumer.java | 18 ++++++++++++
 .../camel/component/undertow/UndertowEndpoint.java | 21 ++++++++++----
 .../dsl/UndertowEndpointBuilderFactory.java        | 33 +++++++++++++++++++++-
 7 files changed, 88 insertions(+), 10 deletions(-)

diff --git a/components/camel-undertow/src/main/docs/undertow-component.adoc 
b/components/camel-undertow/src/main/docs/undertow-component.adoc
index 480d953..612dfb6 100644
--- a/components/camel-undertow/src/main/docs/undertow-component.adoc
+++ b/components/camel-undertow/src/main/docs/undertow-component.adoc
@@ -90,6 +90,7 @@ with the following path and query parameters:
 | *httpMethodRestrict* (consumer) | Used to only allow consuming if the 
HttpMethod matches, such as GET/POST/PUT etc. Multiple methods can be specified 
separated by comma. |  | String
 | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to 
find a target consumer by matching the URI prefix if no exact match is found. | 
false | Boolean
 | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for 
this Servlet consumer. By default OPTIONS is turned off. | false | boolean
+| *useStreaming* (consumer) | For HTTP endpoint: if true, text and binary 
messages will be wrapped as java.io.InputStream before they are passed to an 
Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if 
true, text and binary messages will be wrapped as java.io.Reader and 
java.io.InputStream respectively before they are passed to an Exchange; 
otherwise they will be passed as String and byte respectively. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP 
session |  | CookieHandler
@@ -108,7 +109,6 @@ with the following path and query parameters:
 | *fireWebSocketChannelEvents* (websocket) | if true, the consumer will post 
notifications to the route when a new WebSocket peer connects, disconnects, 
etc. See UndertowConstants.EVENT_TYPE and EventType. | false | boolean
 | *sendTimeout* (websocket) | Timeout in milliseconds when sending to a 
websocket channel. The default timeout is 30000 (30 seconds). | 30000 | Integer
 | *sendToAll* (websocket) | To send to all websocket subscribers. Can be used 
to configure on endpoint level, instead of having to use the 
UndertowConstants.SEND_TO_ALL header on the message. |  | Boolean
-| *useStreaming* (websocket) | if true, text and binary messages coming 
through a WebSocket will be wrapped as java.io.Reader and java.io.InputStream 
respectively before they are passed to an Exchange; otherwise they will be 
passed as String and byte respectively. | false | boolean
 | *sslContextParameters* (security) | To configure security using 
SSLContextParameters |  | SSLContextParameters
 |===
 // endpoint options: END
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
index 603a414..00eff07 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xnio.channels.BlockingReadableByteChannel;
 import org.xnio.channels.StreamSourceChannel;
+import org.xnio.streams.ChannelInputStream;
 
 /**
  * DefaultUndertowHttpBinding represent binding used by default, if user 
doesn't provide any.
@@ -68,10 +69,16 @@ public class DefaultUndertowHttpBinding implements 
UndertowHttpBinding {
     //use default filter strategy from Camel HTTP
     private HeaderFilterStrategy headerFilterStrategy;
     private Boolean transferException;
+    private boolean useStreaming;
 
     public DefaultUndertowHttpBinding() {
+        this(false);
+    }
+
+    public DefaultUndertowHttpBinding(boolean useStreaming) {
         this.headerFilterStrategy = new UndertowHeaderFilterStrategy();
         this.transferException = Boolean.FALSE;
+        this.useStreaming = useStreaming;
     }
 
     public DefaultUndertowHttpBinding(HeaderFilterStrategy 
headerFilterStrategy, Boolean transferException) {
@@ -123,7 +130,12 @@ public class DefaultUndertowHttpBinding implements 
UndertowHttpBinding {
             //extract body by myself if undertow parser didn't handle and the 
method is allowed to have one
             //body is extracted as byte[] then auto TypeConverter kicks in
             if (Methods.POST.equals(httpExchange.getRequestMethod()) || 
Methods.PUT.equals(httpExchange.getRequestMethod()) || 
Methods.PATCH.equals(httpExchange.getRequestMethod())) {
-                
result.setBody(readFromChannel(httpExchange.getRequestChannel()));
+                StreamSourceChannel source = httpExchange.getRequestChannel();
+                if (useStreaming) {
+                    result.setBody(new ChannelInputStream(source));
+                } else {
+                    result.setBody(readFromChannel(source));
+                }
             } else {
                 result.setBody(null);
             }
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java
index 617ba46..24c61ac 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java
@@ -23,6 +23,14 @@ import org.apache.camel.Exchange;
 
 public class RestUndertowHttpBinding extends DefaultUndertowHttpBinding {
 
+    public RestUndertowHttpBinding() {
+        super();
+    }
+
+    public RestUndertowHttpBinding(boolean useStreaming) {
+        super(useStreaming);
+    }
+
     @Override
     public void populateCamelHeaders(HttpServerExchange httpExchange, 
Map<String, Object> headersMap, Exchange exchange) throws Exception {
         super.populateCamelHeaders(httpExchange, headersMap, exchange);
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
index ff8dc14..2b5a5bc 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
@@ -246,7 +246,7 @@ public class UndertowComponent extends DefaultComponent 
implements RestConsumerF
 
         if (!map.containsKey("undertowHttpBinding")) {
             // use the rest binding, if not using a custom http binding
-            endpoint.setUndertowHttpBinding(new RestUndertowHttpBinding());
+            endpoint.setUndertowHttpBinding(new 
RestUndertowHttpBinding(endpoint.isUseStreaming()));
         }
 
         // configure consumer properties
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index d700bf1..81ff826 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.undertow;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -37,12 +39,14 @@ import io.undertow.websockets.core.WebSocketChannel;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.undertow.UndertowConstants.EventType;
 import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.util.CollectionStringBuffer;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -161,6 +165,10 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler {
             doneUoW(camelExchange);
         }
 
+        sendResponse(httpExchange, camelExchange);
+    }
+
+    private void sendResponse(HttpServerExchange httpExchange, Exchange 
camelExchange) throws IOException, NoTypeConversionAvailableException {
         Object body = getResponseBody(httpExchange, camelExchange);
         TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter();
 
@@ -168,6 +176,16 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler {
             log.trace("No payload to send as reply for exchange: {}", 
camelExchange);
             
httpExchange.getResponseHeaders().put(ExchangeHeaders.CONTENT_TYPE, 
MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt"));
             httpExchange.getResponseSender().send("No response available");
+            return;
+        }
+
+        if (body instanceof InputStream) {
+            httpExchange.startBlocking();
+            try (InputStream input = (InputStream) body;
+                 OutputStream output = httpExchange.getOutputStream()) {
+                // flush on each write so that it won't cause OutOfMemoryError
+                IOHelper.copy(input, output, IOHelper.DEFAULT_BUFFER_SIZE, 
true);
+            }
         } else {
             ByteBuffer bodyAsByteBuffer = 
tc.mandatoryConvertTo(ByteBuffer.class, body);
             httpExchange.getResponseSender().send(bodyAsByteBuffer);
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
index d1ab5b1..e586077 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
@@ -80,6 +80,8 @@ public class UndertowEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     private Boolean matchOnUriPrefix = Boolean.FALSE;
     @UriParam(label = "consumer", defaultValue = "false")
     private Boolean accessLog = Boolean.FALSE;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean useStreaming;
     @UriParam(label = "producer", defaultValue = "true")
     private Boolean throwExceptionOnFailure = Boolean.TRUE;
     @UriParam(label = "producer", defaultValue = "false")
@@ -101,8 +103,6 @@ public class UndertowEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     @UriParam(label = "producer,websocket", defaultValue = "30000")
     private Integer sendTimeout = 30000;
     @UriParam(label = "consumer,websocket", defaultValue = "false")
-    private boolean useStreaming;
-    @UriParam(label = "consumer,websocket", defaultValue = "false")
     private boolean fireWebSocketChannelEvents;
 
     public UndertowEndpoint(String uri, UndertowComponent component) {
@@ -250,7 +250,7 @@ public class UndertowEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     public UndertowHttpBinding getUndertowHttpBinding() {
         if (undertowHttpBinding == null) {
             // create a new binding and use the options from this endpoint
-            undertowHttpBinding = new DefaultUndertowHttpBinding();
+            undertowHttpBinding = new DefaultUndertowHttpBinding(useStreaming);
             
undertowHttpBinding.setHeaderFilterStrategy(getHeaderFilterStrategy());
             undertowHttpBinding.setTransferException(getTransferException());
         }
@@ -360,9 +360,18 @@ public class UndertowEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     }
 
     /**
-     * if {@code true}, text and binary messages coming through a WebSocket 
will be wrapped as java.io.Reader and
-     * java.io.InputStream respectively before they are passed to an {@link 
Exchange}; otherwise they will be passed as
-     * String and byte[] respectively.
+     * <p>
+     * For HTTP endpoint:
+     * if {@code true}, text and binary messages will be wrapped as {@link 
java.io.InputStream}
+     * before they are passed to an {@link Exchange}; otherwise they will be 
passed as byte[].
+     * </p>
+     *
+     * <p>
+     * For WebSocket endpoint:
+     * if {@code true}, text and binary messages will be wrapped as {@link 
java.io.Reader} and
+     * {@link java.io.InputStream} respectively before they are passed to an 
{@link Exchange};
+     * otherwise they will be passed as String and byte[] respectively.
+     * </p>
      */
     public void setUseStreaming(boolean useStreaming) {
         this.useStreaming = useStreaming;
diff --git 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
index 972fc71..40b114f 100644
--- 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
@@ -171,6 +171,37 @@ public interface UndertowEndpointBuilderFactory {
             return this;
         }
         /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * The option is a <code>boolean</code> type.
+         * @group consumer
+         */
+        default UndertowEndpointConsumerBuilder useStreaming(
+                boolean useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * The option will be converted to a <code>boolean</code> type.
+         * @group consumer
+         */
+        default UndertowEndpointConsumerBuilder useStreaming(String 
useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
          * if true, the consumer will post notifications to the route when a 
new
          * WebSocket peer connects, disconnects, etc. See
          * UndertowConstants.EVENT_TYPE and EventType.
@@ -1147,4 +1178,4 @@ public interface UndertowEndpointBuilderFactory {
         }
         return new UndertowEndpointBuilderImpl(path);
     }
-}
\ No newline at end of file
+}

Reply via email to