This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 72e18efd15af749bde12fe71f923d94889c01fca Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Mon Jul 8 14:43:45 2019 +0900 CAMEL-13734: camel-undertow - Support streaming for consumer endpoints --- .../src/main/docs/undertow-component.adoc | 2 +- .../undertow/DefaultUndertowHttpBinding.java | 14 ++++++++- .../undertow/RestUndertowHttpBinding.java | 8 ++++++ .../component/undertow/UndertowComponent.java | 2 +- .../camel/component/undertow/UndertowConsumer.java | 18 ++++++++++++ .../camel/component/undertow/UndertowEndpoint.java | 21 ++++++++++---- .../dsl/UndertowEndpointBuilderFactory.java | 33 +++++++++++++++++++++- 7 files changed, 88 insertions(+), 10 deletions(-) diff --git a/components/camel-undertow/src/main/docs/undertow-component.adoc b/components/camel-undertow/src/main/docs/undertow-component.adoc index 480d953..612dfb6 100644 --- a/components/camel-undertow/src/main/docs/undertow-component.adoc +++ b/components/camel-undertow/src/main/docs/undertow-component.adoc @@ -90,6 +90,7 @@ with the following path and query parameters: | *httpMethodRestrict* (consumer) | Used to only allow consuming if the HttpMethod matches, such as GET/POST/PUT etc. Multiple methods can be specified separated by comma. | | String | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found. | false | Boolean | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for this Servlet consumer. By default OPTIONS is turned off. | false | boolean +| *useStreaming* (consumer) | For HTTP endpoint: if true, text and binary messages will be wrapped as java.io.InputStream before they are passed to an Exchange; otherwise they will be passed as byte. For WebSocket endpoint: if true, text and binary messages will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP session | | CookieHandler @@ -108,7 +109,6 @@ with the following path and query parameters: | *fireWebSocketChannelEvents* (websocket) | if true, the consumer will post notifications to the route when a new WebSocket peer connects, disconnects, etc. See UndertowConstants.EVENT_TYPE and EventType. | false | boolean | *sendTimeout* (websocket) | Timeout in milliseconds when sending to a websocket channel. The default timeout is 30000 (30 seconds). | 30000 | Integer | *sendToAll* (websocket) | To send to all websocket subscribers. Can be used to configure on endpoint level, instead of having to use the UndertowConstants.SEND_TO_ALL header on the message. | | Boolean -| *useStreaming* (websocket) | if true, text and binary messages coming through a WebSocket will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *sslContextParameters* (security) | To configure security using SSLContextParameters | | SSLContextParameters |=== // endpoint options: END diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java index 603a414..00eff07 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java @@ -56,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xnio.channels.BlockingReadableByteChannel; import org.xnio.channels.StreamSourceChannel; +import org.xnio.streams.ChannelInputStream; /** * DefaultUndertowHttpBinding represent binding used by default, if user doesn't provide any. @@ -68,10 +69,16 @@ public class DefaultUndertowHttpBinding implements UndertowHttpBinding { //use default filter strategy from Camel HTTP private HeaderFilterStrategy headerFilterStrategy; private Boolean transferException; + private boolean useStreaming; public DefaultUndertowHttpBinding() { + this(false); + } + + public DefaultUndertowHttpBinding(boolean useStreaming) { this.headerFilterStrategy = new UndertowHeaderFilterStrategy(); this.transferException = Boolean.FALSE; + this.useStreaming = useStreaming; } public DefaultUndertowHttpBinding(HeaderFilterStrategy headerFilterStrategy, Boolean transferException) { @@ -123,7 +130,12 @@ public class DefaultUndertowHttpBinding implements UndertowHttpBinding { //extract body by myself if undertow parser didn't handle and the method is allowed to have one //body is extracted as byte[] then auto TypeConverter kicks in if (Methods.POST.equals(httpExchange.getRequestMethod()) || Methods.PUT.equals(httpExchange.getRequestMethod()) || Methods.PATCH.equals(httpExchange.getRequestMethod())) { - result.setBody(readFromChannel(httpExchange.getRequestChannel())); + StreamSourceChannel source = httpExchange.getRequestChannel(); + if (useStreaming) { + result.setBody(new ChannelInputStream(source)); + } else { + result.setBody(readFromChannel(source)); + } } else { result.setBody(null); } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java index 617ba46..24c61ac 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/RestUndertowHttpBinding.java @@ -23,6 +23,14 @@ import org.apache.camel.Exchange; public class RestUndertowHttpBinding extends DefaultUndertowHttpBinding { + public RestUndertowHttpBinding() { + super(); + } + + public RestUndertowHttpBinding(boolean useStreaming) { + super(useStreaming); + } + @Override public void populateCamelHeaders(HttpServerExchange httpExchange, Map<String, Object> headersMap, Exchange exchange) throws Exception { super.populateCamelHeaders(httpExchange, headersMap, exchange); diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java index ff8dc14..2b5a5bc 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java @@ -246,7 +246,7 @@ public class UndertowComponent extends DefaultComponent implements RestConsumerF if (!map.containsKey("undertowHttpBinding")) { // use the rest binding, if not using a custom http binding - endpoint.setUndertowHttpBinding(new RestUndertowHttpBinding()); + endpoint.setUndertowHttpBinding(new RestUndertowHttpBinding(endpoint.isUseStreaming())); } // configure consumer properties diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java index d700bf1..81ff826 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java @@ -17,6 +17,8 @@ package org.apache.camel.component.undertow; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.nio.ByteBuffer; import java.util.Collection; @@ -37,12 +39,14 @@ import io.undertow.websockets.core.WebSocketChannel; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; import org.apache.camel.component.undertow.UndertowConstants.EventType; import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.util.CollectionStringBuffer; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; /** @@ -161,6 +165,10 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { doneUoW(camelExchange); } + sendResponse(httpExchange, camelExchange); + } + + private void sendResponse(HttpServerExchange httpExchange, Exchange camelExchange) throws IOException, NoTypeConversionAvailableException { Object body = getResponseBody(httpExchange, camelExchange); TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter(); @@ -168,6 +176,16 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { log.trace("No payload to send as reply for exchange: {}", camelExchange); httpExchange.getResponseHeaders().put(ExchangeHeaders.CONTENT_TYPE, MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt")); httpExchange.getResponseSender().send("No response available"); + return; + } + + if (body instanceof InputStream) { + httpExchange.startBlocking(); + try (InputStream input = (InputStream) body; + OutputStream output = httpExchange.getOutputStream()) { + // flush on each write so that it won't cause OutOfMemoryError + IOHelper.copy(input, output, IOHelper.DEFAULT_BUFFER_SIZE, true); + } } else { ByteBuffer bodyAsByteBuffer = tc.mandatoryConvertTo(ByteBuffer.class, body); httpExchange.getResponseSender().send(bodyAsByteBuffer); diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java index d1ab5b1..e586077 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java @@ -80,6 +80,8 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, private Boolean matchOnUriPrefix = Boolean.FALSE; @UriParam(label = "consumer", defaultValue = "false") private Boolean accessLog = Boolean.FALSE; + @UriParam(label = "consumer", defaultValue = "false") + private boolean useStreaming; @UriParam(label = "producer", defaultValue = "true") private Boolean throwExceptionOnFailure = Boolean.TRUE; @UriParam(label = "producer", defaultValue = "false") @@ -101,8 +103,6 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, @UriParam(label = "producer,websocket", defaultValue = "30000") private Integer sendTimeout = 30000; @UriParam(label = "consumer,websocket", defaultValue = "false") - private boolean useStreaming; - @UriParam(label = "consumer,websocket", defaultValue = "false") private boolean fireWebSocketChannelEvents; public UndertowEndpoint(String uri, UndertowComponent component) { @@ -250,7 +250,7 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, public UndertowHttpBinding getUndertowHttpBinding() { if (undertowHttpBinding == null) { // create a new binding and use the options from this endpoint - undertowHttpBinding = new DefaultUndertowHttpBinding(); + undertowHttpBinding = new DefaultUndertowHttpBinding(useStreaming); undertowHttpBinding.setHeaderFilterStrategy(getHeaderFilterStrategy()); undertowHttpBinding.setTransferException(getTransferException()); } @@ -360,9 +360,18 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, } /** - * if {@code true}, text and binary messages coming through a WebSocket will be wrapped as java.io.Reader and - * java.io.InputStream respectively before they are passed to an {@link Exchange}; otherwise they will be passed as - * String and byte[] respectively. + * <p> + * For HTTP endpoint: + * if {@code true}, text and binary messages will be wrapped as {@link java.io.InputStream} + * before they are passed to an {@link Exchange}; otherwise they will be passed as byte[]. + * </p> + * + * <p> + * For WebSocket endpoint: + * if {@code true}, text and binary messages will be wrapped as {@link java.io.Reader} and + * {@link java.io.InputStream} respectively before they are passed to an {@link Exchange}; + * otherwise they will be passed as String and byte[] respectively. + * </p> */ public void setUseStreaming(boolean useStreaming) { this.useStreaming = useStreaming; diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java index 972fc71..40b114f 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/UndertowEndpointBuilderFactory.java @@ -171,6 +171,37 @@ public interface UndertowEndpointBuilderFactory { return this; } /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * The option is a <code>boolean</code> type. + * @group consumer + */ + default UndertowEndpointConsumerBuilder useStreaming( + boolean useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** + * For HTTP endpoint: if true, text and binary messages will be wrapped + * as java.io.InputStream before they are passed to an Exchange; + * otherwise they will be passed as byte. For WebSocket endpoint: if + * true, text and binary messages will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an + * Exchange; otherwise they will be passed as String and byte + * respectively. + * The option will be converted to a <code>boolean</code> type. + * @group consumer + */ + default UndertowEndpointConsumerBuilder useStreaming(String useStreaming) { + setProperty("useStreaming", useStreaming); + return this; + } + /** * if true, the consumer will post notifications to the route when a new * WebSocket peer connects, disconnects, etc. See * UndertowConstants.EVENT_TYPE and EventType. @@ -1147,4 +1178,4 @@ public interface UndertowEndpointBuilderFactory { } return new UndertowEndpointBuilderImpl(path); } -} \ No newline at end of file +}