This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0747f77e50b08e2248b5db7e63cc4f4cd8b91dbe
Author: Tadayoshi Sato <sato.tadayo...@gmail.com>
AuthorDate: Mon Jul 8 15:42:24 2019 +0900

    CAMEL-13734: camel-undertow - Support streaming for producer endpoints
---
 .../src/main/docs/undertow-component.adoc          |   2 +-
 .../undertow/DefaultUndertowHttpBinding.java       |  17 ++-
 .../component/undertow/UndertowClientCallback.java |  18 +--
 .../camel/component/undertow/UndertowConsumer.java |   4 +-
 .../camel/component/undertow/UndertowEndpoint.java |   4 +-
 .../camel/component/undertow/UndertowProducer.java | 117 ++++++++-------
 .../undertow/UndertowStreamingClientCallback.java  |  61 ++++++++
 .../dsl/UndertowEndpointBuilderFactory.java        | 166 +++++++++++++--------
 .../main/java/org/apache/camel/util/IOHelper.java  |  13 ++
 .../modules/ROOT/pages/undertow-component.adoc     |   2 +-
 10 files changed, 271 insertions(+), 133 deletions(-)

diff --git a/components/camel-undertow/src/main/docs/undertow-component.adoc 
b/components/camel-undertow/src/main/docs/undertow-component.adoc
index 612dfb6..1e08541 100644
--- a/components/camel-undertow/src/main/docs/undertow-component.adoc
+++ b/components/camel-undertow/src/main/docs/undertow-component.adoc
@@ -85,12 +85,12 @@ with the following path and query parameters:
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *useStreaming* (common) | For HTTP endpoint: if true, text and binary 
messages will be wrapped as java.io.InputStream before they are passed to an 
Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if 
true, text and binary messages will be wrapped as java.io.Reader and 
java.io.InputStream respectively before they are passed to an Exchange; 
otherwise they will be passed as String and byte respectively. | false | boolean
 | *accessLog* (consumer) | Whether or not the consumer should write access log 
| false | Boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions, that will be logged at WARN or ERROR level and ignored. | false | 
boolean
 | *httpMethodRestrict* (consumer) | Used to only allow consuming if the 
HttpMethod matches, such as GET/POST/PUT etc. Multiple methods can be specified 
separated by comma. |  | String
 | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to 
find a target consumer by matching the URI prefix if no exact match is found. | 
false | Boolean
 | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for 
this Servlet consumer. By default OPTIONS is turned off. | false | boolean
-| *useStreaming* (consumer) | For HTTP endpoint: if true, text and binary 
messages will be wrapped as java.io.InputStream before they are passed to an 
Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if 
true, text and binary messages will be wrapped as java.io.Reader and 
java.io.InputStream respectively before they are passed to an Exchange; 
otherwise they will be passed as String and byte respectively. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP 
session |  | CookieHandler
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
index 00eff07..28211de 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
@@ -150,7 +150,22 @@ public class DefaultUndertowHttpBinding implements 
UndertowHttpBinding {
         //retrieve response headers
         populateCamelHeaders(clientExchange.getResponse(), 
result.getHeaders(), exchange);
 
-        result.setBody(readFromChannel(clientExchange.getResponseChannel()));
+        StreamSourceChannel source = clientExchange.getResponseChannel();
+        if (useStreaming) {
+            // client connection can be closed only after input stream is 
fully read
+            result.setBody(new ChannelInputStream(source) {
+                @Override
+                public void close() throws IOException {
+                    try {
+                        super.close();
+                    } finally {
+                        clientExchange.getConnection().close();
+                    }
+                }
+            });
+        } else {
+            result.setBody(readFromChannel(source));
+        }
 
         return result;
     }
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
index 12cf09b..f6c3c77 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
@@ -55,11 +55,11 @@ import org.xnio.channels.StreamSinkChannel;
  * Undertow {@link ClientCallback} that will get notified when the HTTP
  * connection is ready or when the client failed to connect. It will also 
handle
  * writing the request and reading the response in
- * {@link #writeRequest(ClientExchange, ByteBuffer)} and
+ * {@link #writeRequest(ClientExchange)} and
  * {@link #setupResponseListener(ClientExchange)}. The main entry point is
  * {@link #completed(ClientConnection)} or {@link #failed(IOException)} in case
  * of errors, every error condition that should terminate Camel {@link 
Exchange}
- * should go to {@link #hasFailedWith(Exception)} and successful execution of
+ * should go to {@link #hasFailedWith(Throwable)} and successful execution of
  * the exchange should end with {@link #finish(Message)}. Any
  * {@link ClientCallback}s that are added here should extend
  * {@link ErrorHandlingClientCallback}, best way to do that is to use the
@@ -104,11 +104,11 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
      */
     private final BlockingDeque<Closeable> closables = new 
LinkedBlockingDeque<>();
 
-    private final UndertowEndpoint endpoint;
+    protected final UndertowEndpoint endpoint;
 
-    private final Exchange exchange;
+    protected final Exchange exchange;
 
-    private final ClientRequest request;
+    protected final ClientRequest request;
 
     private final Boolean throwExceptionOnFailure;
 
@@ -119,7 +119,7 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
         this.endpoint = endpoint;
         this.request = request;
         this.body = body;
-        throwExceptionOnFailure = endpoint.getThrowExceptionOnFailure();
+        this.throwExceptionOnFailure = endpoint.getThrowExceptionOnFailure();
     }
 
     @Override
@@ -186,7 +186,7 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
         finish(null);
     }
 
-    <T> ClientCallback<T> on(final Consumer<T> consumer) {
+    protected <T> ClientCallback<T> on(final Consumer<T> consumer) {
         return new ErrorHandlingClientCallback<>(consumer);
     }
 
@@ -196,7 +196,7 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
         setupResponseListener(clientExchange);
 
         // write the request
-        writeRequest(clientExchange, body);
+        writeRequest(clientExchange);
     }
 
     void setupResponseListener(final ClientExchange clientExchange) {
@@ -266,7 +266,7 @@ class UndertowClientCallback implements 
ClientCallback<ClientConnection> {
         }
     }
 
-    void writeRequest(final ClientExchange clientExchange, final ByteBuffer 
body) {
+    protected void writeRequest(final ClientExchange clientExchange) {
         final StreamSinkChannel requestChannel = 
clientExchange.getRequestChannel();
         if (body != null) {
             try {
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index 81ff826..72b9e49 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -170,7 +170,6 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler {
 
     private void sendResponse(HttpServerExchange httpExchange, Exchange 
camelExchange) throws IOException, NoTypeConversionAvailableException {
         Object body = getResponseBody(httpExchange, camelExchange);
-        TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter();
 
         if (body == null) {
             log.trace("No payload to send as reply for exchange: {}", 
camelExchange);
@@ -179,7 +178,7 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler {
             return;
         }
 
-        if (body instanceof InputStream) {
+        if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) {
             httpExchange.startBlocking();
             try (InputStream input = (InputStream) body;
                  OutputStream output = httpExchange.getOutputStream()) {
@@ -187,6 +186,7 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler {
                 IOHelper.copy(input, output, IOHelper.DEFAULT_BUFFER_SIZE, 
true);
             }
         } else {
+            TypeConverter tc = 
getEndpoint().getCamelContext().getTypeConverter();
             ByteBuffer bodyAsByteBuffer = 
tc.mandatoryConvertTo(ByteBuffer.class, body);
             httpExchange.getResponseSender().send(bodyAsByteBuffer);
         }
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
index e586077..83754cf 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
@@ -66,6 +66,8 @@ public class UndertowEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
 
     @UriPath @Metadata(required = true)
     private URI httpURI;
+    @UriParam(label = "common", defaultValue = "false")
+    private boolean useStreaming;
     @UriParam(label = "advanced")
     private UndertowHttpBinding undertowHttpBinding;
     @UriParam(label = "advanced")
@@ -80,8 +82,6 @@ public class UndertowEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     private Boolean matchOnUriPrefix = Boolean.FALSE;
     @UriParam(label = "consumer", defaultValue = "false")
     private Boolean accessLog = Boolean.FALSE;
-    @UriParam(label = "consumer", defaultValue = "false")
-    private boolean useStreaming;
     @UriParam(label = "producer", defaultValue = "true")
     private Boolean throwExceptionOnFailure = Boolean.TRUE;
     @UriParam(label = "producer", defaultValue = "false")
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
index af970a4..7b498a4 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
@@ -26,7 +26,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
 import javax.net.ssl.SSLContext;
 
 import io.undertow.client.ClientRequest;
@@ -89,51 +88,62 @@ public class UndertowProducer extends DefaultAsyncProducer {
     public boolean process(final Exchange camelExchange, final AsyncCallback 
callback) {
         if (endpoint.isWebSocket()) {
             return processWebSocket(camelExchange, callback);
-        } else {
-            /* not a WebSocket */
-            final URI uri;
-            final HttpString method;
+        }
+
+        /* not a WebSocket */
+        final URI uri;
+        final HttpString method;
+        try {
+            final String exchangeUri = UndertowHelper.createURL(camelExchange, 
getEndpoint());
+            uri = UndertowHelper.createURI(camelExchange, exchangeUri, 
getEndpoint());
+            method = UndertowHelper.createMethod(camelExchange, endpoint, 
camelExchange.getIn().getBody() != null);
+        } catch (final URISyntaxException e) {
+            camelExchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        final String pathAndQuery = URISupport.pathAndQueryOf(uri);
+
+        final UndertowHttpBinding undertowHttpBinding = 
endpoint.getUndertowHttpBinding();
+
+        final CookieHandler cookieHandler = endpoint.getCookieHandler();
+        final Map<String, List<String>> cookieHeaders;
+        if (cookieHandler != null) {
             try {
-                final String exchangeUri = 
UndertowHelper.createURL(camelExchange, getEndpoint());
-                uri = UndertowHelper.createURI(camelExchange, exchangeUri, 
getEndpoint());
-                method = UndertowHelper.createMethod(camelExchange, endpoint, 
camelExchange.getIn().getBody() != null);
-            } catch (final URISyntaxException e) {
+                cookieHeaders = cookieHandler.loadCookies(camelExchange, uri);
+            } catch (final IOException e) {
                 camelExchange.setException(e);
                 callback.done(true);
                 return true;
             }
+        } else {
+            cookieHeaders = Collections.emptyMap();
+        }
 
-            final String pathAndQuery = URISupport.pathAndQueryOf(uri);
-
-            final UndertowHttpBinding undertowHttpBinding = 
endpoint.getUndertowHttpBinding();
-
-            final CookieHandler cookieHandler = endpoint.getCookieHandler();
-            final Map<String, List<String>> cookieHeaders;
-            if (cookieHandler != null) {
-                try {
-                    cookieHeaders = cookieHandler.loadCookies(camelExchange, 
uri);
-                } catch (final IOException e) {
-                    camelExchange.setException(e);
-                    callback.done(true);
-                    return true;
-                }
-            } else {
-                cookieHeaders = Collections.emptyMap();
-            }
-
-            final ClientRequest request = new ClientRequest();
-            request.setMethod(method);
-            request.setPath(pathAndQuery);
+        final ClientRequest request = new ClientRequest();
+        request.setMethod(method);
+        request.setPath(pathAndQuery);
 
-            final HeaderMap requestHeaders = request.getRequestHeaders();
+        final HeaderMap requestHeaders = request.getRequestHeaders();
 
-            // Set the Host header
-            final Message message = camelExchange.getIn();
-            final String host = message.getHeader(Headers.HOST_STRING, 
String.class);
-            requestHeaders.put(Headers.HOST, 
Optional.ofNullable(host).orElseGet(() -> uri.getAuthority()));
+        // Set the Host header
+        final Message message = camelExchange.getIn();
+        final String host = message.getHeader(Headers.HOST_STRING, 
String.class);
+        requestHeaders.put(Headers.HOST, 
Optional.ofNullable(host).orElseGet(uri::getAuthority));
 
-            final Object body = undertowHttpBinding.toHttpRequest(request, 
camelExchange.getIn());
+        cookieHeaders.forEach((key, values) -> {
+            requestHeaders.putAll(HttpString.tryFromString(key), values);
+        });
 
+        final Object body = undertowHttpBinding.toHttpRequest(request, 
camelExchange.getIn());
+        final UndertowClientCallback clientCallback;
+        if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) {
+            // For streaming, make it chunked encoding instead of specifying 
content length
+            requestHeaders.put(Headers.TRANSFER_ENCODING, "chunked");
+            clientCallback = new 
UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
+                    request, (InputStream) body);
+        } else {
             final TypeConverter tc = 
endpoint.getCamelContext().getTypeConverter();
             final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, 
body);
 
@@ -143,29 +153,24 @@ public class UndertowProducer extends 
DefaultAsyncProducer {
                 requestHeaders.put(Headers.CONTENT_LENGTH, 
bodyAsByte.remaining());
             }
 
-            for (final Map.Entry<String, List<String>> entry : 
cookieHeaders.entrySet()) {
-                
requestHeaders.putAll(HttpString.tryFromString(entry.getKey()), 
entry.getValue());
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Executing http {} method: {}", method, 
pathAndQuery);
-            }
-
-            final UndertowClientCallback clientCallback = new 
UndertowClientCallback(camelExchange, callback, getEndpoint(),
-                request, bodyAsByte);
-
-            // when connect succeeds or fails UndertowClientCallback will
-            // get notified on a I/O thread run by Xnio worker. The writing
-            // of request and reading of response is performed also in the
-            // callback
-            client.connect(clientCallback, uri, worker, ssl, pool, options);
+            clientCallback = new UndertowClientCallback(camelExchange, 
callback, getEndpoint(),
+                    request, bodyAsByte);
+        }
 
-            // the call above will proceed on Xnio I/O thread we will
-            // notify the exchange asynchronously when the HTTP exchange
-            // ends with success or failure from UndertowClientCallback
-            return false;
+        if (log.isDebugEnabled()) {
+            log.debug("Executing http {} method: {}", method, pathAndQuery);
         }
 
+        // when connect succeeds or fails UndertowClientCallback will
+        // get notified on a I/O thread run by Xnio worker. The writing
+        // of request and reading of response is performed also in the
+        // callback
+        client.connect(clientCallback, uri, worker, ssl, pool, options);
+
+        // the call above will proceed on Xnio I/O thread we will
+        // notify the exchange asynchronously when the HTTP exchange
+        // ends with success or failure from UndertowClientCallback
+        return false;
     }
 
     private boolean processWebSocket(final Exchange camelExchange, final 
AsyncCallback camelCallback) {
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
new file mode 100644
index 0000000..b1e2e34
--- /dev/null
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.undertow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import io.undertow.client.ClientConnection;
+import io.undertow.client.ClientExchange;
+import io.undertow.client.ClientRequest;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.IOHelper;
+import org.xnio.channels.StreamSinkChannel;
+
+class UndertowStreamingClientCallback extends UndertowClientCallback {
+
+    private InputStream bodyStream;
+
+    UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback,
+                                    UndertowEndpoint endpoint, ClientRequest 
request,
+                                    InputStream body) {
+        super(exchange, callback, endpoint, request, null);
+        this.bodyStream = body;
+    }
+
+    @Override
+    public void completed(ClientConnection connection) {
+        // no connection closing registered as streaming continues downstream
+        connection.sendRequest(request, on(this::performClientExchange));
+    }
+
+    @Override
+    protected void writeRequest(ClientExchange clientExchange) {
+        StreamSinkChannel requestChannel = clientExchange.getRequestChannel();
+        try (ReadableByteChannel source = Channels.newChannel(bodyStream)) {
+            IOHelper.transfer(source, requestChannel);
+            flush(requestChannel);
+        } catch (final IOException e) {
+            hasFailedWith(e);
+        }
+    }
+}
diff --git 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
index 40b114f..5c70e7c 100644
--- 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java
@@ -45,6 +45,41 @@ public interface UndertowEndpointBuilderFactory {
             return (AdvancedUndertowEndpointConsumerBuilder) this;
         }
         /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: common
+         */
+        default UndertowEndpointConsumerBuilder useStreaming(
+                boolean useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: common
+         */
+        default UndertowEndpointConsumerBuilder useStreaming(String 
useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
          * Whether or not the consumer should write access log.
          * 
          * The option is a: <code>java.lang.Boolean</code> type.
@@ -171,37 +206,6 @@ public interface UndertowEndpointBuilderFactory {
             return this;
         }
         /**
-         * For HTTP endpoint: if true, text and binary messages will be wrapped
-         * as java.io.InputStream before they are passed to an Exchange;
-         * otherwise they will be passed as byte. For WebSocket endpoint: if
-         * true, text and binary messages will be wrapped as java.io.Reader and
-         * java.io.InputStream respectively before they are passed to an
-         * Exchange; otherwise they will be passed as String and byte
-         * respectively.
-         * The option is a <code>boolean</code> type.
-         * @group consumer
-         */
-        default UndertowEndpointConsumerBuilder useStreaming(
-                boolean useStreaming) {
-            setProperty("useStreaming", useStreaming);
-            return this;
-        }
-        /**
-         * For HTTP endpoint: if true, text and binary messages will be wrapped
-         * as java.io.InputStream before they are passed to an Exchange;
-         * otherwise they will be passed as byte. For WebSocket endpoint: if
-         * true, text and binary messages will be wrapped as java.io.Reader and
-         * java.io.InputStream respectively before they are passed to an
-         * Exchange; otherwise they will be passed as String and byte
-         * respectively.
-         * The option will be converted to a <code>boolean</code> type.
-         * @group consumer
-         */
-        default UndertowEndpointConsumerBuilder useStreaming(String 
useStreaming) {
-            setProperty("useStreaming", useStreaming);
-            return this;
-        }
-        /**
          * if true, the consumer will post notifications to the route when a 
new
          * WebSocket peer connects, disconnects, etc. See
          * UndertowConstants.EVENT_TYPE and EventType.
@@ -230,35 +234,6 @@ public interface UndertowEndpointBuilderFactory {
             return this;
         }
         /**
-         * if true, text and binary messages coming through a WebSocket will be
-         * wrapped as java.io.Reader and java.io.InputStream respectively 
before
-         * they are passed to an Exchange; otherwise they will be passed as
-         * String and byte respectively.
-         * 
-         * The option is a: <code>boolean</code> type.
-         * 
-         * Group: websocket
-         */
-        default UndertowEndpointConsumerBuilder useStreaming(
-                boolean useStreaming) {
-            setProperty("useStreaming", useStreaming);
-            return this;
-        }
-        /**
-         * if true, text and binary messages coming through a WebSocket will be
-         * wrapped as java.io.Reader and java.io.InputStream respectively 
before
-         * they are passed to an Exchange; otherwise they will be passed as
-         * String and byte respectively.
-         * 
-         * The option will be converted to a <code>boolean</code> type.
-         * 
-         * Group: websocket
-         */
-        default UndertowEndpointConsumerBuilder useStreaming(String 
useStreaming) {
-            setProperty("useStreaming", useStreaming);
-            return this;
-        }
-        /**
          * To configure security using SSLContextParameters.
          * 
          * The option is a:
@@ -504,6 +479,41 @@ public interface UndertowEndpointBuilderFactory {
             return (AdvancedUndertowEndpointProducerBuilder) this;
         }
         /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: common
+         */
+        default UndertowEndpointProducerBuilder useStreaming(
+                boolean useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: common
+         */
+        default UndertowEndpointProducerBuilder useStreaming(String 
useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
          * Configure a cookie handler to maintain a HTTP session.
          * 
          * The option is a:
@@ -981,6 +991,40 @@ public interface UndertowEndpointBuilderFactory {
             return (AdvancedUndertowEndpointBuilder) this;
         }
         /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: common
+         */
+        default UndertowEndpointBuilder useStreaming(boolean useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
+         * For HTTP endpoint: if true, text and binary messages will be wrapped
+         * as java.io.InputStream before they are passed to an Exchange;
+         * otherwise they will be passed as byte. For WebSocket endpoint: if
+         * true, text and binary messages will be wrapped as java.io.Reader and
+         * java.io.InputStream respectively before they are passed to an
+         * Exchange; otherwise they will be passed as String and byte
+         * respectively.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: common
+         */
+        default UndertowEndpointBuilder useStreaming(String useStreaming) {
+            setProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
          * To configure security using SSLContextParameters.
          * 
          * The option is a:
@@ -1178,4 +1222,4 @@ public interface UndertowEndpointBuilderFactory {
         }
         return new UndertowEndpointBuilderImpl(path);
     }
-}
+}
\ No newline at end of file
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java 
b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
index 565b54c..701ee6e 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
@@ -37,6 +37,8 @@ import java.io.Writer;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.UnsupportedCharsetException;
 import java.util.function.Supplier;
@@ -251,6 +253,17 @@ public final class IOHelper {
         return total;
     }
 
+    public static void transfer(ReadableByteChannel input, WritableByteChannel 
output) throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+        while (input.read(buffer) >= 0) {
+            buffer.flip();
+            while (buffer.hasRemaining()) {
+                output.write(buffer);
+            }
+            buffer.clear();
+        }
+    }
+
     /**
      * Forces any updates to this channel's file to be written to the storage
      * device that contains it.
diff --git a/docs/components/modules/ROOT/pages/undertow-component.adoc 
b/docs/components/modules/ROOT/pages/undertow-component.adoc
index 480d953..612dfb6 100644
--- a/docs/components/modules/ROOT/pages/undertow-component.adoc
+++ b/docs/components/modules/ROOT/pages/undertow-component.adoc
@@ -90,6 +90,7 @@ with the following path and query parameters:
 | *httpMethodRestrict* (consumer) | Used to only allow consuming if the 
HttpMethod matches, such as GET/POST/PUT etc. Multiple methods can be specified 
separated by comma. |  | String
 | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to 
find a target consumer by matching the URI prefix if no exact match is found. | 
false | Boolean
 | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for 
this Servlet consumer. By default OPTIONS is turned off. | false | boolean
+| *useStreaming* (consumer) | For HTTP endpoint: if true, text and binary 
messages will be wrapped as java.io.InputStream before they are passed to an 
Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if 
true, text and binary messages will be wrapped as java.io.Reader and 
java.io.InputStream respectively before they are passed to an Exchange; 
otherwise they will be passed as String and byte respectively. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP 
session |  | CookieHandler
@@ -108,7 +109,6 @@ with the following path and query parameters:
 | *fireWebSocketChannelEvents* (websocket) | if true, the consumer will post 
notifications to the route when a new WebSocket peer connects, disconnects, 
etc. See UndertowConstants.EVENT_TYPE and EventType. | false | boolean
 | *sendTimeout* (websocket) | Timeout in milliseconds when sending to a 
websocket channel. The default timeout is 30000 (30 seconds). | 30000 | Integer
 | *sendToAll* (websocket) | To send to all websocket subscribers. Can be used 
to configure on endpoint level, instead of having to use the 
UndertowConstants.SEND_TO_ALL header on the message. |  | Boolean
-| *useStreaming* (websocket) | if true, text and binary messages coming 
through a WebSocket will be wrapped as java.io.Reader and java.io.InputStream 
respectively before they are passed to an Exchange; otherwise they will be 
passed as String and byte respectively. | false | boolean
 | *sslContextParameters* (security) | To configure security using 
SSLContextParameters |  | SSLContextParameters
 |===
 // endpoint options: END

Reply via email to