CAMEL-6483: Optimized camel-jetty writing response. As well IOHelper copy streams if using byte arrays as input.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5cddbb2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5cddbb2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5cddbb2 Branch: refs/heads/master Commit: b5cddbb2c1723502abdb2f91c3efa0203db0ca53 Parents: c83e23c Author: Claus Ibsen <[email protected]> Authored: Mon Jun 24 16:13:35 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Tue Jun 25 07:39:14 2013 +0200 ---------------------------------------------------------------------- .../converter/stream/CachedOutputStream.java | 4 ++ .../java/org/apache/camel/util/IOHelper.java | 41 ++++++++++++++++---- .../component/http/DefaultHttpBinding.java | 39 ++++++++++++++----- .../jetty/SimpleJettyChunkedFalseTest.java | 40 +++++++++++++++++++ 4 files changed, 108 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java index 4951c90..a2cf0a1 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java @@ -131,6 +131,10 @@ public class CachedOutputStream extends OutputStream { return currentStream.hashCode(); } + public OutputStream getCurrentStream() { + return currentStream; + } + public String toString() { return "CachedOutputStream[size: " + totalLength + "]"; } http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/camel-core/src/main/java/org/apache/camel/util/IOHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java index ca4b82f..15278e2 100644 --- a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java @@ -20,6 +20,8 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.FileOutputStream; import java.io.IOException; @@ -34,6 +36,7 @@ import java.nio.charset.Charset; import java.nio.charset.UnsupportedCharsetException; import org.apache.camel.Exchange; +import org.apache.camel.converter.stream.CachedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -162,14 +165,32 @@ public final class IOHelper { public static int copy(InputStream input, OutputStream output) throws IOException { return copy(input, output, DEFAULT_BUFFER_SIZE); } - + public static int copy(final InputStream input, final OutputStream output, int bufferSize) throws IOException { - int avail = input.available(); - if (avail > 262144) { - avail = 262144; + return copy(input, output, bufferSize, false); + } + + public static int copy(final InputStream input, final OutputStream output, int bufferSize, boolean flushOnEachWrite) throws IOException { + if (input instanceof ByteArrayInputStream) { + // optimized for byte array as we only need the max size it can be + input.mark(0); + input.reset(); + bufferSize = input.available(); + } else { + int avail = input.available(); + if (avail > bufferSize) { + bufferSize = avail; + } } - if (avail > bufferSize) { - bufferSize = avail; + + if (bufferSize > 262144) { + // upper cap to avoid buffers too big + bufferSize = 262144; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Copying InputStream: {} -> OutputStream: {} with buffer: {} and flush on each write {}", + new Object[]{input, output, bufferSize, flushOnEachWrite}); } final byte[] buffer = new byte[bufferSize]; @@ -177,10 +198,16 @@ public final class IOHelper { int total = 0; while (-1 != n) { output.write(buffer, 0, n); + if (flushOnEachWrite) { + output.flush(); + } total += n; n = input.read(buffer); } - output.flush(); + if (!flushOnEachWrite) { + // flush at end, if we didn't do it during the writing + output.flush(); + } return total; } http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java b/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java index e291480..f58b4e1 100644 --- a/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java +++ b/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.http; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -308,10 +309,11 @@ public class DefaultHttpBinding implements HttpBinding { return false; } - protected void copyStream(InputStream is, OutputStream os) throws IOException { + protected int copyStream(InputStream is, OutputStream os, int bufferSize) throws IOException { try { - // copy directly from input stream to output stream - IOHelper.copy(is, os); + // copy stream, and must flush on each write as etc Jetty has better performance when + // flushing after writing to its servlet output stream + return IOHelper.copy(is, os, bufferSize, true); } finally { IOHelper.close(os, is); } @@ -344,31 +346,47 @@ public class DefaultHttpBinding implements HttpBinding { if (is != null) { ServletOutputStream os = response.getOutputStream(); - LOG.trace("Writing direct response from source input stream to servlet output stream"); if (!checkChunked(message, exchange)) { CachedOutputStream stream = new CachedOutputStream(exchange); try { // copy directly from input stream to the cached output stream to get the content length - int len = IOHelper.copy(is, stream); + int len = copyStream(is, stream, response.getBufferSize()); // we need to setup the length if message is not chucked response.setContentLength(len); - copyStream(stream.getInputStream(), os); + OutputStream current = stream.getCurrentStream(); + if (current instanceof ByteArrayOutputStream) { + if (LOG.isDebugEnabled()) { + LOG.debug("Streaming (direct) response in non-chunked mode with content-length {}"); + } + ByteArrayOutputStream bos = (ByteArrayOutputStream) current; + bos.writeTo(os); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Streaming response in non-chunked mode with content-length {} and buffer size: {}", len, len); + } + copyStream(stream.getInputStream(), os, len); + } } finally { - IOHelper.close(is, stream); + IOHelper.close(is, os); } } else { - copyStream(is, os); + if (LOG.isDebugEnabled()) { + LOG.debug("Streaming response in chunked mode with buffer size {}", response.getBufferSize()); + } + copyStream(is, os, response.getBufferSize()); } } else { // not convertable as a stream so fallback as a String String data = message.getBody(String.class); if (data != null) { - LOG.debug("Cannot write from source input stream, falling back to using String content. For binary content this can be a problem."); // set content length and encoding before we write data String charset = IOHelper.getCharsetName(exchange, true); final int dataByteLength = data.getBytes(charset).length; response.setCharacterEncoding(charset); response.setContentLength(dataByteLength); + if (LOG.isDebugEnabled()) { + LOG.debug("Writing response in non-chunked mode as plain text with content-length {} and buffer size: {}", dataByteLength, response.getBufferSize()); + } try { response.getWriter().print(data); } finally { @@ -403,6 +421,9 @@ public class DefaultHttpBinding implements HttpBinding { byte[] data = GZIPHelper.compressGZIP(bytes); ServletOutputStream os = response.getOutputStream(); try { + if (LOG.isDebugEnabled()) { + LOG.debug("Streaming response as GZIP in non-chunked mode with content-length {} and buffer size: {}", data.length, response.getBufferSize()); + } response.setContentLength(data.length); os.write(data); os.flush(); http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java new file mode 100644 index 0000000..56114d2 --- /dev/null +++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jetty; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class SimpleJettyChunkedFalseTest extends BaseJettyTest { + + @Test + public void testSimple() throws Exception { + String result = template.requestBody("http://localhost:{{port}}/myapp", "Camel", String.class); + assertEquals("Hello Camel", result); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("jetty:http://localhost:{{port}}/myapp?chunked=false") + .transform(body().prepend("Hello ")); + } + }; + } + +}
