Repository: camel Updated Branches: refs/heads/master 19405559c -> af73eb8fe
CAMEL-9274: More data formats support stream caching directly. Thanks to Aaron Whiteside for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af73eb8f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af73eb8f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af73eb8f Branch: refs/heads/master Commit: af73eb8fe3b695f4c8f8c2a1d1f7818af99261a5 Parents: 1940555 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Nov 19 08:48:53 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Nov 19 08:48:53 2015 +0100 ---------------------------------------------------------------------- .../org/apache/camel/impl/GzipDataFormat.java | 18 ++++++------- .../camel/component/gson/GsonDataFormat.java | 28 +++++++++----------- .../dataformat/protobuf/ProtobufDataFormat.java | 10 +++---- .../dataformat/tarfile/TarFileDataFormat.java | 26 +++++++++--------- .../camel/dataformat/tarfile/TarIterator.java | 9 +++---- 5 files changed, 42 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/af73eb8f/camel-core/src/main/java/org/apache/camel/impl/GzipDataFormat.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/GzipDataFormat.java b/camel-core/src/main/java/org/apache/camel/impl/GzipDataFormat.java index 66f7a7f..0647c36 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/GzipDataFormat.java +++ b/camel-core/src/main/java/org/apache/camel/impl/GzipDataFormat.java @@ -16,13 +16,13 @@ */ package org.apache.camel.impl; -import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.apache.camel.Exchange; +import org.apache.camel.converter.stream.OutputStreamBuilder; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.DataFormatName; import org.apache.camel.util.IOHelper; @@ -37,7 +37,7 @@ public class GzipDataFormat extends org.apache.camel.support.ServiceSupport impl return "gzip"; } - public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception { + public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception { InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph); GZIPOutputStream zipOutput = new GZIPOutputStream(stream); @@ -49,19 +49,17 @@ public class GzipDataFormat extends org.apache.camel.support.ServiceSupport impl } } - public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { - InputStream is = exchange.getIn().getMandatoryBody(InputStream.class); + public Object unmarshal(final Exchange exchange, final InputStream inputStream) throws Exception { GZIPInputStream unzipInput = null; - // Create an expandable byte array to hold the inflated data - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange); try { - unzipInput = new GZIPInputStream(is); - IOHelper.copy(unzipInput, bos); - return bos.toByteArray(); + unzipInput = new GZIPInputStream(inputStream); + IOHelper.copy(unzipInput, osb); + return osb.build(); } finally { // must close all input streams - IOHelper.close(unzipInput, is); + IOHelper.close(osb, unzipInput, inputStream); } } http://git-wip-us.apache.org/repos/asf/camel/blob/af73eb8f/components/camel-gson/src/main/java/org/apache/camel/component/gson/GsonDataFormat.java ---------------------------------------------------------------------- diff --git a/components/camel-gson/src/main/java/org/apache/camel/component/gson/GsonDataFormat.java b/components/camel-gson/src/main/java/org/apache/camel/component/gson/GsonDataFormat.java index 86948cc..8a49dd8 100644 --- a/components/camel-gson/src/main/java/org/apache/camel/component/gson/GsonDataFormat.java +++ b/components/camel-gson/src/main/java/org/apache/camel/component/gson/GsonDataFormat.java @@ -122,25 +122,23 @@ public class GsonDataFormat extends ServiceSupport implements DataFormat, DataFo } @Override - public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception { - BufferedWriter writer = IOHelper.buffered(new OutputStreamWriter(stream, IOHelper.getCharsetName(exchange))); - gson.toJson(graph, writer); - writer.close(); + public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception { + try (final OutputStreamWriter osw = new OutputStreamWriter(stream, IOHelper.getCharsetName(exchange)); + final BufferedWriter writer = IOHelper.buffered(osw)) { + gson.toJson(graph, writer); + } } @Override - public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { - BufferedReader reader = IOHelper.buffered(new InputStreamReader(stream, IOHelper.getCharsetName(exchange))); - Object result = null; - - if (this.unmarshalGenericType != null) { - result = gson.fromJson(reader, this.unmarshalGenericType); - } else { - result = gson.fromJson(reader, this.unmarshalType); + public Object unmarshal(final Exchange exchange, final InputStream stream) throws Exception { + try (final InputStreamReader isr = new InputStreamReader(stream, IOHelper.getCharsetName(exchange)); + final BufferedReader reader = IOHelper.buffered(isr)) { + if (unmarshalGenericType == null) { + return gson.fromJson(reader, unmarshalType); + } else { + return gson.fromJson(reader, unmarshalGenericType); + } } - - reader.close(); - return result; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/af73eb8f/components/camel-protobuf/src/main/java/org/apache/camel/dataformat/protobuf/ProtobufDataFormat.java ---------------------------------------------------------------------- diff --git a/components/camel-protobuf/src/main/java/org/apache/camel/dataformat/protobuf/ProtobufDataFormat.java b/components/camel-protobuf/src/main/java/org/apache/camel/dataformat/protobuf/ProtobufDataFormat.java index 485ad8c..dcd7f4c 100755 --- a/components/camel-protobuf/src/main/java/org/apache/camel/dataformat/protobuf/ProtobufDataFormat.java +++ b/components/camel-protobuf/src/main/java/org/apache/camel/dataformat/protobuf/ProtobufDataFormat.java @@ -80,7 +80,7 @@ public class ProtobufDataFormat extends ServiceSupport implements DataFormat, Da * @see org.apache.camel.spi.DataFormat#marshal(org.apache.camel.Exchange, * java.lang.Object, java.io.OutputStream) */ - public void marshal(Exchange exchange, Object graph, OutputStream outputStream) throws Exception { + public void marshal(final Exchange exchange, final Object graph, final OutputStream outputStream) throws Exception { ((Message)graph).writeTo(outputStream); } @@ -89,7 +89,7 @@ public class ProtobufDataFormat extends ServiceSupport implements DataFormat, Da * @see org.apache.camel.spi.DataFormat#unmarshal(org.apache.camel.Exchange, * java.io.InputStream) */ - public Object unmarshal(Exchange exchange, InputStream inputStream) throws Exception { + public Object unmarshal(final Exchange exchange, final InputStream inputStream) throws Exception { ObjectHelper.notNull(defaultInstance, "defaultInstance or instanceClassName must be set", this); Builder builder = defaultInstance.newBuilderForType().mergeFrom(inputStream); @@ -101,13 +101,13 @@ public class ProtobufDataFormat extends ServiceSupport implements DataFormat, Da return builder.build(); } - protected Message loadDefaultInstance(String className, CamelContext context) throws CamelException, ClassNotFoundException { + protected Message loadDefaultInstance(final String className, final CamelContext context) throws CamelException, ClassNotFoundException { Class<?> instanceClass = context.getClassResolver().resolveMandatoryClass(className); if (Message.class.isAssignableFrom(instanceClass)) { try { Method method = instanceClass.getMethod("getDefaultInstance"); - return (Message) method.invoke(null, new Object[0]); - } catch (Exception ex) { + return (Message) method.invoke(null); + } catch (final Exception ex) { throw new CamelException("Can't set the defaultInstance of ProtobufferDataFormat with " + className + ", caused by " + ex); } http://git-wip-us.apache.org/repos/asf/camel/blob/af73eb8f/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java index e195699..5465edc 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java @@ -17,12 +17,12 @@ package org.apache.camel.dataformat.tarfile; import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Paths; import org.apache.camel.Exchange; +import org.apache.camel.converter.stream.OutputStreamBuilder; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.DataFormatName; import org.apache.camel.support.ServiceSupport; @@ -49,14 +49,14 @@ public class TarFileDataFormat extends ServiceSupport implements DataFormat, Dat } @Override - public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception { + public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception { String filename = exchange.getIn().getHeader(FILE_NAME, String.class); Long filelength = exchange.getIn().getHeader(FILE_LENGTH, Long.class); - if (filename != null) { - filename = new File(filename).getName(); // remove any path elements - } else { + if (filename == null) { // generate the file name as the camel file component would do filename = StringHelper.sanitize(exchange.getIn().getMessageId()); + } else { + filename = Paths.get(filename).getFileName().toString(); // remove any path elements } TarArchiveOutputStream tos = new TarArchiveOutputStream(stream); @@ -84,19 +84,19 @@ public class TarFileDataFormat extends ServiceSupport implements DataFormat, Dat } @Override - public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { + public Object unmarshal(final Exchange exchange, final InputStream stream) throws Exception { if (usingIterator) { return new TarIterator(exchange.getIn(), stream); } else { - InputStream is = exchange.getIn().getMandatoryBody(InputStream.class); - TarArchiveInputStream tis = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, new BufferedInputStream(is)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BufferedInputStream bis = new BufferedInputStream(stream); + TarArchiveInputStream tis = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, bis); + OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange); try { TarArchiveEntry entry = tis.getNextTarEntry(); if (entry != null) { exchange.getOut().setHeader(FILE_NAME, entry.getName()); - IOHelper.copy(tis, baos); + IOHelper.copy(tis, osb); } entry = tis.getNextTarEntry(); @@ -104,10 +104,10 @@ public class TarFileDataFormat extends ServiceSupport implements DataFormat, Dat throw new IllegalStateException("Tar file has more than 1 entry."); } - return baos.toByteArray(); + return osb.build(); } finally { - IOHelper.close(tis, baos); + IOHelper.close(osb, tis, bis); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/af73eb8f/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java index 943ea44..91933c0 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java @@ -108,10 +108,8 @@ public class TarIterator implements Iterator<Message>, Closeable { } private Message getNextElement() { - Message answer = null; - if (tarInputStream == null) { - return answer; + return null; } try { @@ -119,7 +117,7 @@ public class TarIterator implements Iterator<Message>, Closeable { if (current != null) { LOGGER.debug("Reading tarEntry {}", current.getName()); - answer = new DefaultMessage(); + Message answer = new DefaultMessage(); answer.getHeaders().putAll(inputMessage.getHeaders()); answer.setHeader(TARFILE_ENTRY_NAME_HEADER, current.getName()); answer.setHeader(Exchange.FILE_NAME, current.getName()); @@ -132,13 +130,12 @@ public class TarIterator implements Iterator<Message>, Closeable { return answer; } else { LOGGER.trace("Closed tarInputStream"); + return null; } } catch (IOException exception) { //Just wrap the IOException as CamelRuntimeException throw new RuntimeCamelException(exception); } - - return answer; } public void checkNullAnswer(Message answer) {