This is an automated email from the ASF dual-hosted git repository. tsato pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new bc4755c CAMEL-13734: camel-undertow - Fix one-way streaming for producer with tests bc4755c is described below commit bc4755c25aae841d66519525d68a57f39f441472 Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Thu Jul 18 13:41:33 2019 +0900 CAMEL-13734: camel-undertow - Fix one-way streaming for producer with tests (cherry picked from commit 5190b410c85a8169ee6a7eb24db8df4ac050c979) --- .../component/undertow/UndertowClientCallback.java | 18 ++-- .../camel/component/undertow/UndertowProducer.java | 13 ++- .../undertow/UndertowStreamingClientCallback.java | 26 ++++- .../undertow/UndertowHttpStreamingTest.java | 119 +++++++++++++++++++++ 4 files changed, 163 insertions(+), 13 deletions(-) diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java index 21a4171..4eac8e7 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java @@ -100,15 +100,15 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { protected final ClientRequest request; - private final ByteBuffer body; - - private final AsyncCallback callback; + protected final AsyncCallback callback; /** * A queue of resources that will be closed when the exchange ends, add more * resources via {@link #deferClose(Closeable)}. */ - private final BlockingDeque<Closeable> closables = new LinkedBlockingDeque<>(); + protected final BlockingDeque<Closeable> closables = new LinkedBlockingDeque<>(); + + private final ByteBuffer body; private final Boolean throwExceptionOnFailure; @@ -161,9 +161,13 @@ class UndertowClientCallback implements ClientCallback<ClientConnection> { } } - void finish(final Message result) { - for (final Closeable closeable : closables) { - IoUtils.safeClose(closeable); + protected void finish(final Message result) { + finish(result, true); + } + + protected void finish(final Message result, boolean close) { + if (close) { + closables.forEach(IoUtils::safeClose); } if (result != null) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java index 279a350..2a7a5bd 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java @@ -141,7 +141,8 @@ public class UndertowProducer extends DefaultAsyncProducer { final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn()); final UndertowClientCallback clientCallback; - if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) { + final boolean streaming = getEndpoint().isUseStreaming(); + if (streaming && (body instanceof InputStream)) { // For streaming, make it chunked encoding instead of specifying content length requestHeaders.put(Headers.TRANSFER_ENCODING, "chunked"); clientCallback = new UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(), @@ -156,8 +157,14 @@ public class UndertowProducer extends DefaultAsyncProducer { requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.remaining()); } - clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), - request, bodyAsByte); + if (streaming) { + // response may receive streaming + clientCallback = new UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(), + request, bodyAsByte); + } else { + clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), + request, bodyAsByte); + } } if (log.isDebugEnabled()) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java index b1e2e34..af87ef7 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java @@ -28,7 +28,10 @@ import io.undertow.client.ClientExchange; import io.undertow.client.ClientRequest; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; +import org.xnio.IoUtils; import org.xnio.channels.StreamSinkChannel; class UndertowStreamingClientCallback extends UndertowClientCallback { @@ -37,19 +40,36 @@ class UndertowStreamingClientCallback extends UndertowClientCallback { UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback, UndertowEndpoint endpoint, ClientRequest request, + ByteBuffer body) { + super(exchange, callback, endpoint, request, body); + this.bodyStream = null; + } + + UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback, + UndertowEndpoint endpoint, ClientRequest request, InputStream body) { super(exchange, callback, endpoint, request, null); this.bodyStream = body; } @Override - public void completed(ClientConnection connection) { - // no connection closing registered as streaming continues downstream - connection.sendRequest(request, on(this::performClientExchange)); + protected void finish(Message result) { + boolean close = true; + if (result != null && result.getBody() instanceof InputStream) { + // no connection closing as streaming continues downstream + close = false; + } + finish(result, close); } @Override protected void writeRequest(ClientExchange clientExchange) { + if (bodyStream == null) { + super.writeRequest(clientExchange); + return; + } + + // send request stream StreamSinkChannel requestChannel = clientExchange.getRequestChannel(); try (ReadableByteChannel source = Channels.newChannel(bodyStream)) { IOHelper.transfer(source, requestChannel); diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java new file mode 100644 index 0000000..2c1d938 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.junit.Test; + +public class UndertowHttpStreamingTest extends BaseUndertowTest { + + private static final String LINE = + String.join("", Collections.nCopies(100, "0123456789")); + private static final long COUNT = 1000; // approx. 1MB + + @Test + public void testTwoWayStreaming() throws Exception { + long expectedLength = LINE.length() * COUNT; + MockEndpoint mock = getMockEndpoint("mock:length"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(expectedLength); + + Exchange response = template.send( + "undertow:http://localhost:{{port}}?useStreaming=true", + e -> produceStream(e)); + consumeStream(response); + long length = response.getIn().getBody(Long.class).longValue(); + + mock.assertIsSatisfied(); + assertEquals(expectedLength, length); + } + + @Test + public void testOneWayStreaming() throws Exception { + long expectedLength = LINE.length() * COUNT; + MockEndpoint mock = getMockEndpoint("mock:length"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(12); + + Exchange response = template.send( + "undertow:http://localhost:{{port}}?useStreaming=true", + e -> { e.getIn().setBody("Hello Camel!"); }); + consumeStream(response); + long length = response.getIn().getBody(Long.class).longValue(); + + mock.assertIsSatisfied(); + assertEquals(expectedLength, length); + } + + private static void produceStream(Exchange exchange) throws IOException { + PipedOutputStream out = new PipedOutputStream(); + exchange.getIn().setBody(new PipedInputStream(out)); + new Thread(() -> { + try (OutputStreamWriter osw = new OutputStreamWriter(out); + BufferedWriter writer = new BufferedWriter(osw)) { + LongStream.range(0, COUNT).forEach(i -> { + try { + writer.write(LINE); + writer.newLine(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + } + + private static void consumeStream(Exchange exchange) throws IOException { + try (InputStream in = exchange.getIn().getBody(InputStream.class); + BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + long length = reader.lines() + .collect(Collectors.summingLong(String::length)); + exchange.getIn().setBody(length); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("undertow:http://localhost:{{port}}?useStreaming=true") + .process(e -> consumeStream(e)) + .to("mock:length") + .process(e -> produceStream(e)); + } + }; + } + +}