This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0747f77e50b08e2248b5db7e63cc4f4cd8b91dbe Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Mon Jul 8 15:42:24 2019 +0900 CAMEL-13734: camel-undertow - Support streaming for producer endpoints --- .../src/main/docs/undertow-component.adoc | 2 +- .../undertow/DefaultUndertowHttpBinding.java | 17 ++- .../component/undertow/UndertowClientCallback.java | 18 +-- .../camel/component/undertow/UndertowConsumer.java | 4 +- .../camel/component/undertow/UndertowEndpoint.java | 4 +- .../camel/component/undertow/UndertowProducer.java | 117 ++++++++------- .../undertow/UndertowStreamingClientCallback.java | 61 ++++++++ .../dsl/UndertowEndpointBuilderFactory.java | 166 +++++++++++++-------- .../main/java/org/apache/camel/util/IOHelper.java | 13 ++ .../modules/ROOT/pages/undertow-component.adoc | 2 +- 10 files changed, 271 insertions(+), 133 deletions(-) diff --git a/components/camel-undertow/src/main/docs/undertow-component.adoc b/components/camel-undertow/src/main/docs/undertow-component.adoc index 612dfb6..1e08541 100644 --- a/components/camel-undertow/src/main/docs/undertow-component.adoc +++ b/components/camel-undertow/src/main/docs/undertow-component.adoc @@ -85,12 +85,12 @@ with the following path and query parameters: [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type +| *useStreaming* (common) | For HTTP endpoint: if true, text and binary messages will be wrapped as java.io.InputStream before they are passed to an Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if true, text and binary messages will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *accessLog* (consumer) | Whether or not the consumer should write access log | false | Boolean | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *httpMethodRestrict* (consumer) | Used to only allow consuming if the HttpMethod matches, such as GET/POST/PUT etc. Multiple methods can be specified separated by comma. | | String | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found. | false | Boolean | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for this Servlet consumer. By default OPTIONS is turned off. | false | boolean -| *useStreaming* (consumer) | For HTTP endpoint: if true, text and binary messages will be wrapped as java.io.InputStream before they are passed to an Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if true, text and binary messages will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP session | | CookieHandler diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java index 00eff07..28211de 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java @@ -150,7 +150,22 @@ public class DefaultUndertowHttpBinding implements UndertowHttpBinding { //retrieve response headers populateCamelHeaders(clientExchange.getResponse(), result.getHeaders(), exchange); - result.setBody(readFromChannel(clientExchange.getResponseChannel())); + StreamSourceChannel source = clientExchange.getResponseChannel(); + if (useStreaming) { + // client connection can be closed only after input stream is fully read + result.setBody(new ChannelInputStream(source) { + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + clientExchange.getConnection().close(); + } + } + }); + } else { + result.setBody(readFromChannel(source)); + } return result; } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java index 12cf09b..f6c3c77 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java @@ -55,11 +55,11 @@ import org.xnio.channels.StreamSinkChannel; * Undertow {@link ClientCallback} that will get notified when the HTTP * connection is ready or when the client failed to connect. It will also handle * writing the request and reading the response in - * {@link #writeRequest(ClientExchange, ByteBuffer)} and + * {@link #writeRequest(ClientExchange)} and * {@link #setupResponseListener(ClientExchange)}. The main entry point is * {@link #completed(ClientConnection)} or {@link #failed(IOException)} in case * of errors, every error condition that should terminate Camel {@link Exchange} - * should go to {@link #hasFailedWith(Exception)} and successful execution of + * should go to {@link #hasFailedWith(Throwable)} and successful execution of * the exchange should end with {@link #finish(Message)}. Any * {@link ClientCallback}s that are added here should extend * {@link ErrorHandlingClientCallback}, best way to do that is to use the @@ -104,11 +104,11 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { */ private final BlockingDeque<Closeable> closables = new LinkedBlockingDeque<>(); - private final UndertowEndpoint endpoint; + protected final UndertowEndpoint endpoint; - private final Exchange exchange; + protected final Exchange exchange; - private final ClientRequest request; + protected final ClientRequest request; private final Boolean throwExceptionOnFailure; @@ -119,7 +119,7 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { this.endpoint = endpoint; this.request = request; this.body = body; - throwExceptionOnFailure = endpoint.getThrowExceptionOnFailure(); + this.throwExceptionOnFailure = endpoint.getThrowExceptionOnFailure(); } @Override @@ -186,7 +186,7 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { finish(null); } - <T> ClientCallback<T> on(final Consumer<T> consumer) { + protected <T> ClientCallback<T> on(final Consumer<T> consumer) { return new ErrorHandlingClientCallback<>(consumer); } @@ -196,7 +196,7 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { setupResponseListener(clientExchange); // write the request - writeRequest(clientExchange, body); + writeRequest(clientExchange); } void setupResponseListener(final ClientExchange clientExchange) { @@ -266,7 +266,7 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { } } - void writeRequest(final ClientExchange clientExchange, final ByteBuffer body) { + protected void writeRequest(final ClientExchange clientExchange) { final StreamSinkChannel requestChannel = clientExchange.getRequestChannel(); if (body != null) { try { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java index 81ff826..72b9e49 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java @@ -170,7 +170,6 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { private void sendResponse(HttpServerExchange httpExchange, Exchange camelExchange) throws IOException, NoTypeConversionAvailableException { Object body = getResponseBody(httpExchange, camelExchange); - TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter(); if (body == null) { log.trace("No payload to send as reply for exchange: {}", camelExchange); @@ -179,7 +178,7 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { return; } - if (body instanceof InputStream) { + if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) { httpExchange.startBlocking(); try (InputStream input = (InputStream) body; OutputStream output = httpExchange.getOutputStream()) { @@ -187,6 +186,7 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { IOHelper.copy(input, output, IOHelper.DEFAULT_BUFFER_SIZE, true); } } else { + TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter(); ByteBuffer bodyAsByteBuffer = tc.mandatoryConvertTo(ByteBuffer.class, body); httpExchange.getResponseSender().send(bodyAsByteBuffer); } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java index e586077..83754cf 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java @@ -66,6 +66,8 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, @UriPath @Metadata(required = true) private URI httpURI; + @UriParam(label = "common", defaultValue = "false") + private boolean useStreaming; @UriParam(label = "advanced") private UndertowHttpBinding undertowHttpBinding; @UriParam(label = "advanced") @@ -80,8 +82,6 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, private Boolean matchOnUriPrefix = Boolean.FALSE; @UriParam(label = "consumer", defaultValue = "false") private Boolean accessLog = Boolean.FALSE; - @UriParam(label = "consumer", defaultValue = "false") - private boolean useStreaming; @UriParam(label = "producer", defaultValue = "true") private Boolean throwExceptionOnFailure = Boolean.TRUE; @UriParam(label = "producer", defaultValue = "false") diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java index af970a4..7b498a4 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java @@ -26,7 +26,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; - import javax.net.ssl.SSLContext; import io.undertow.client.ClientRequest; @@ -89,51 +88,62 @@ public class UndertowProducer extends DefaultAsyncProducer { public boolean process(final Exchange camelExchange, final AsyncCallback callback) { if (endpoint.isWebSocket()) { return processWebSocket(camelExchange, callback); - } else { - /* not a WebSocket */ - final URI uri; - final HttpString method; + } + + /* not a WebSocket */ + final URI uri; + final HttpString method; + try { + final String exchangeUri = UndertowHelper.createURL(camelExchange, getEndpoint()); + uri = UndertowHelper.createURI(camelExchange, exchangeUri, getEndpoint()); + method = UndertowHelper.createMethod(camelExchange, endpoint, camelExchange.getIn().getBody() != null); + } catch (final URISyntaxException e) { + camelExchange.setException(e); + callback.done(true); + return true; + } + + final String pathAndQuery = URISupport.pathAndQueryOf(uri); + + final UndertowHttpBinding undertowHttpBinding = endpoint.getUndertowHttpBinding(); + + final CookieHandler cookieHandler = endpoint.getCookieHandler(); + final Map<String, List<String>> cookieHeaders; + if (cookieHandler != null) { try { - final String exchangeUri = UndertowHelper.createURL(camelExchange, getEndpoint()); - uri = UndertowHelper.createURI(camelExchange, exchangeUri, getEndpoint()); - method = UndertowHelper.createMethod(camelExchange, endpoint, camelExchange.getIn().getBody() != null); - } catch (final URISyntaxException e) { + cookieHeaders = cookieHandler.loadCookies(camelExchange, uri); + } catch (final IOException e) { camelExchange.setException(e); callback.done(true); return true; } + } else { + cookieHeaders = Collections.emptyMap(); + } - final String pathAndQuery = URISupport.pathAndQueryOf(uri); - - final UndertowHttpBinding undertowHttpBinding = endpoint.getUndertowHttpBinding(); - - final CookieHandler cookieHandler = endpoint.getCookieHandler(); - final Map<String, List<String>> cookieHeaders; - if (cookieHandler != null) { - try { - cookieHeaders = cookieHandler.loadCookies(camelExchange, uri); - } catch (final IOException e) { - camelExchange.setException(e); - callback.done(true); - return true; - } - } else { - cookieHeaders = Collections.emptyMap(); - } - - final ClientRequest request = new ClientRequest(); - request.setMethod(method); - request.setPath(pathAndQuery); + final ClientRequest request = new ClientRequest(); + request.setMethod(method); + request.setPath(pathAndQuery); - final HeaderMap requestHeaders = request.getRequestHeaders(); + final HeaderMap requestHeaders = request.getRequestHeaders(); - // Set the Host header - final Message message = camelExchange.getIn(); - final String host = message.getHeader(Headers.HOST_STRING, String.class); - requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(() -> uri.getAuthority())); + // Set the Host header + final Message message = camelExchange.getIn(); + final String host = message.getHeader(Headers.HOST_STRING, String.class); + requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(uri::getAuthority)); - final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn()); + cookieHeaders.forEach((key, values) -> { + requestHeaders.putAll(HttpString.tryFromString(key), values); + }); + final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn()); + final UndertowClientCallback clientCallback; + if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) { + // For streaming, make it chunked encoding instead of specifying content length + requestHeaders.put(Headers.TRANSFER_ENCODING, "chunked"); + clientCallback = new UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(), + request, (InputStream) body); + } else { final TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body); @@ -143,29 +153,24 @@ public class UndertowProducer extends DefaultAsyncProducer { requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.remaining()); } - for (final Map.Entry<String, List<String>> entry : cookieHeaders.entrySet()) { - requestHeaders.putAll(HttpString.tryFromString(entry.getKey()), entry.getValue()); - } - - if (log.isDebugEnabled()) { - log.debug("Executing http {} method: {}", method, pathAndQuery); - } - - final UndertowClientCallback clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), - request, bodyAsByte); - - // when connect succeeds or fails UndertowClientCallback will - // get notified on a I/O thread run by Xnio worker. The writing - // of request and reading of response is performed also in the - // callback - client.connect(clientCallback, uri, worker, ssl, pool, options); + clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), + request, bodyAsByte); + } - // the call above will proceed on Xnio I/O thread we will - // notify the exchange asynchronously when the HTTP exchange - // ends with success or failure from UndertowClientCallback - return false; + if (log.isDebugEnabled()) { + log.debug("Executing http {} method: {}", method, pathAndQuery); } + // when connect succeeds or fails UndertowClientCallback will + // get notified on a I/O thread run by Xnio worker. The writing + // of request and reading of response is performed also in the + // callback + client.connect(clientCallback, uri, worker, ssl, pool, options); + + // the call above will proceed on Xnio I/O thread we will + // notify the exchange asynchronously when the HTTP exchange + // ends with success or failure from UndertowClientCallback + return false; } private boolean processWebSocket(final Exchange camelExchange, final AsyncCallback camelCallback) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java new file mode 100644 index 0000000..b1e2e34 --- /dev/null +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +import io.undertow.client.ClientConnection; +import io.undertow.client.ClientExchange; +import io.undertow.client.ClientRequest; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.util.IOHelper; +import org.xnio.channels.StreamSinkChannel; + +class UndertowStreamingClientCallback extends UndertowClientCallback { + + private InputStream bodyStream; + + UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback, + UndertowEndpoint endpoint, ClientRequest request, + InputStream body) { + super(exchange, callback, endpoint, request, null); + this.bodyStream = body; + } + + @Override + public void completed(ClientConnection connection) { + // no connection closing registered as streaming continues downstream + connection.sendRequest(request, on(this::performClientExchange)); + } + + @Override + protected void writeRequest(ClientExchange clientExchange) { + StreamSinkChannel requestChannel = clientExchange.getRequestChannel(); + try (ReadableByteChannel source = Channels.newChannel(bodyStream)) { + IOHelper.transfer(source, requestChannel); + flush(requestChannel); + } catch (final IOException e) { + hasFailedWith(e); + } + } +} diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java index 40b114f..5c70e7c 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java @@ -45,6 +45,41 @@ public interface UndertowEndpointBuilderFactory { return (AdvancedUndertowEndpointConsumerBuilder) this; } /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * + * The option is a: <code>boolean</code> type. + * + * Group: common + */ + default UndertowEndpointConsumerBuilder useStreaming( + boolean useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * + * The option will be converted to a <code>boolean</code> type. + * + * Group: common + */ + default UndertowEndpointConsumerBuilder useStreaming(String useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** * Whether or not the consumer should write access log. * * The option is a: <code>java.lang.Boolean</code> type. @@ -171,37 +206,6 @@ public interface UndertowEndpointBuilderFactory { return this; } /** - * For HTTP endpoint: if true, text and binary messages will be wrapped - * as java.io.InputStream before they are passed to an Exchange; - * otherwise they will be passed as byte. For WebSocket endpoint: if - * true, text and binary messages will be wrapped as java.io.Reader and - * java.io.InputStream respectively before they are passed to an - * Exchange; otherwise they will be passed as String and byte - * respectively. - * The option is a <code>boolean</code> type. - * @group consumer - */ - default UndertowEndpointConsumerBuilder useStreaming( - boolean useStreaming) { - setProperty("useStreaming", useStreaming); - return this; - } - /** - * For HTTP endpoint: if true, text and binary messages will be wrapped - * as java.io.InputStream before they are passed to an Exchange; - * otherwise they will be passed as byte. For WebSocket endpoint: if - * true, text and binary messages will be wrapped as java.io.Reader and - * java.io.InputStream respectively before they are passed to an - * Exchange; otherwise they will be passed as String and byte - * respectively. - * The option will be converted to a <code>boolean</code> type. - * @group consumer - */ - default UndertowEndpointConsumerBuilder useStreaming(String useStreaming) { - setProperty("useStreaming", useStreaming); - return this; - } - /** * if true, the consumer will post notifications to the route when a new * WebSocket peer connects, disconnects, etc. See * UndertowConstants.EVENT_TYPE and EventType. @@ -230,35 +234,6 @@ public interface UndertowEndpointBuilderFactory { return this; } /** - * if true, text and binary messages coming through a WebSocket will be - * wrapped as java.io.Reader and java.io.InputStream respectively before - * they are passed to an Exchange; otherwise they will be passed as - * String and byte respectively. - * - * The option is a: <code>boolean</code> type. - * - * Group: websocket - */ - default UndertowEndpointConsumerBuilder useStreaming( - boolean useStreaming) { - setProperty("useStreaming", useStreaming); - return this; - } - /** - * if true, text and binary messages coming through a WebSocket will be - * wrapped as java.io.Reader and java.io.InputStream respectively before - * they are passed to an Exchange; otherwise they will be passed as - * String and byte respectively. - * - * The option will be converted to a <code>boolean</code> type. - * - * Group: websocket - */ - default UndertowEndpointConsumerBuilder useStreaming(String useStreaming) { - setProperty("useStreaming", useStreaming); - return this; - } - /** * To configure security using SSLContextParameters. * * The option is a: @@ -504,6 +479,41 @@ public interface UndertowEndpointBuilderFactory { return (AdvancedUndertowEndpointProducerBuilder) this; } /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * + * The option is a: <code>boolean</code> type. + * + * Group: common + */ + default UndertowEndpointProducerBuilder useStreaming( + boolean useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * + * The option will be converted to a <code>boolean</code> type. + * + * Group: common + */ + default UndertowEndpointProducerBuilder useStreaming(String useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** * Configure a cookie handler to maintain a HTTP session. * * The option is a: @@ -981,6 +991,40 @@ public interface UndertowEndpointBuilderFactory { return (AdvancedUndertowEndpointBuilder) this; } /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * + * The option is a: <code>boolean</code> type. + * + * Group: common + */ + default UndertowEndpointBuilder useStreaming(boolean useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * + * The option will be converted to a <code>boolean</code> type. + * + * Group: common + */ + default UndertowEndpointBuilder useStreaming(String useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** * To configure security using SSLContextParameters. * * The option is a: @@ -1178,4 +1222,4 @@ public interface UndertowEndpointBuilderFactory { } return new UndertowEndpointBuilderImpl(path); } -} +} \ No newline at end of file diff --git a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java index 565b54c..701ee6e 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java @@ -37,6 +37,8 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; import java.nio.charset.UnsupportedCharsetException; import java.util.function.Supplier; @@ -251,6 +253,17 @@ public final class IOHelper { return total; } + public static void transfer(ReadableByteChannel input, WritableByteChannel output) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + while (input.read(buffer) >= 0) { + buffer.flip(); + while (buffer.hasRemaining()) { + output.write(buffer); + } + buffer.clear(); + } + } + /** * Forces any updates to this channel's file to be written to the storage * device that contains it. diff --git a/docs/components/modules/ROOT/pages/undertow-component.adoc b/docs/components/modules/ROOT/pages/undertow-component.adoc index 480d953..612dfb6 100644 --- a/docs/components/modules/ROOT/pages/undertow-component.adoc +++ b/docs/components/modules/ROOT/pages/undertow-component.adoc @@ -90,6 +90,7 @@ with the following path and query parameters: | *httpMethodRestrict* (consumer) | Used to only allow consuming if the HttpMethod matches, such as GET/POST/PUT etc. Multiple methods can be specified separated by comma. | | String | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found. | false | Boolean | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for this Servlet consumer. By default OPTIONS is turned off. | false | boolean +| *useStreaming* (consumer) | For HTTP endpoint: if true, text and binary messages will be wrapped as java.io.InputStream before they are passed to an Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if true, text and binary messages will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP session | | CookieHandler @@ -108,7 +109,6 @@ with the following path and query parameters: | *fireWebSocketChannelEvents* (websocket) | if true, the consumer will post notifications to the route when a new WebSocket peer connects, disconnects, etc. See UndertowConstants.EVENT_TYPE and EventType. | false | boolean | *sendTimeout* (websocket) | Timeout in milliseconds when sending to a websocket channel. The default timeout is 30000 (30 seconds). | 30000 | Integer | *sendToAll* (websocket) | To send to all websocket subscribers. Can be used to configure on endpoint level, instead of having to use the UndertowConstants.SEND_TO_ALL header on the message. | | Boolean -| *useStreaming* (websocket) | if true, text and binary messages coming through a WebSocket will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *sslContextParameters* (security) | To configure security using SSLContextParameters | | SSLContextParameters |=== // endpoint options: END