CAMEL-9375: TarSplitter includes one extra empty entry at the end.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/43775d08 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/43775d08 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/43775d08 Branch: refs/heads/camel-2.16.x Commit: 43775d08452dce6dfca8f6db1f8a80504fd0f5f9 Parents: d84c66f Author: Sami Nurminen <snurm...@gmail.com> Authored: Sat Dec 19 13:48:27 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Dec 20 07:58:11 2015 +0100 ---------------------------------------------------------------------- .../dataformat/tarfile/TarFileDataFormat.java | 2 +- .../camel/dataformat/tarfile/TarIterator.java | 93 +++++++++----------- .../camel/dataformat/tarfile/TarSplitter.java | 2 +- 3 files changed, 45 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/43775d08/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java index e195699..94f2faa 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarFileDataFormat.java @@ -86,7 +86,7 @@ public class TarFileDataFormat extends ServiceSupport implements DataFormat, Dat @Override public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { if (usingIterator) { - return new TarIterator(exchange.getIn(), stream); + return new TarIterator(exchange, stream); } else { InputStream is = exchange.getIn().getMandatoryBody(InputStream.class); TarArchiveInputStream tis = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, new BufferedInputStream(is)); http://git-wip-us.apache.org/repos/asf/camel/blob/43775d08/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java index 943ea44..c5c85ce 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarIterator.java @@ -26,6 +26,7 @@ import java.util.Iterator; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.StreamCache; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.IOHelper; import org.apache.commons.compress.archivers.ArchiveException; @@ -51,11 +52,13 @@ public class TarIterator implements Iterator<Message>, Closeable { private final Message inputMessage; private TarArchiveInputStream tarInputStream; - private Message parent; + private Message nextMessage; - public TarIterator(Message inputMessage, InputStream inputStream) { - this.inputMessage = inputMessage; - //InputStream inputStream = inputMessage.getBody(InputStream.class); + private Exchange exchange; + + public TarIterator(Exchange exchange, InputStream inputStream) { + this.exchange = exchange; + this.inputMessage = exchange.getIn(); if (inputStream instanceof TarArchiveInputStream) { tarInputStream = (TarArchiveInputStream) inputStream; @@ -67,51 +70,40 @@ public class TarIterator implements Iterator<Message>, Closeable { throw new RuntimeException(e.getMessage(), e); } } - parent = null; + nextMessage = null; } @Override public boolean hasNext() { - try { - if (tarInputStream == null) { - return false; - } - boolean availableDataInCurrentEntry = tarInputStream.available() > 0; - if (!availableDataInCurrentEntry) { - // advance to the next entry. - parent = getNextElement(); - if (parent == null) { - tarInputStream.close(); - availableDataInCurrentEntry = false; - } else { - availableDataInCurrentEntry = true; - } - } - return availableDataInCurrentEntry; - } catch (IOException exception) { - //Just wrap the IOException as CamelRuntimeException - throw new RuntimeCamelException(exception); - } + tryAdvanceToNext(); + + return this.nextMessage != null; } @Override public Message next() { - if (parent == null) { - parent = getNextElement(); - } - - Message answer = parent; - parent = null; - checkNullAnswer(answer); + tryAdvanceToNext(); - return answer; + //consume element + Message next = this.nextMessage; + this.nextMessage = null; + return next; } - private Message getNextElement() { - Message answer = null; + private void tryAdvanceToNext() { + //return current next + if (this.nextMessage != null) { + return; + } + + this.nextMessage = createNextMessage(); + checkNullAnswer(this.nextMessage); + } + + private Message createNextMessage() { if (tarInputStream == null) { - return answer; + return null; } try { @@ -119,12 +111,15 @@ public class TarIterator implements Iterator<Message>, Closeable { if (current != null) { LOGGER.debug("Reading tarEntry {}", current.getName()); - answer = new DefaultMessage(); + Message answer = new DefaultMessage(); answer.getHeaders().putAll(inputMessage.getHeaders()); answer.setHeader(TARFILE_ENTRY_NAME_HEADER, current.getName()); answer.setHeader(Exchange.FILE_NAME, current.getName()); if (current.getSize() > 0) { - answer.setBody(new TarElementInputStreamWrapper(tarInputStream)); + //Have to cache current entry's portion of tarInputStream here, because getNextTarEntry + //advances tarInputStream beyond current entry + answer.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(StreamCache.class, exchange, + new TarElementInputStreamWrapper(tarInputStream))); } else { // Workaround for the case when the entry is zero bytes big answer.setBody(new ByteArrayInputStream(new byte[0])); @@ -132,19 +127,18 @@ public class TarIterator implements Iterator<Message>, Closeable { return answer; } else { LOGGER.trace("Closed tarInputStream"); + return null; } - } catch (IOException exception) { - //Just wrap the IOException as CamelRuntimeException + } catch (Exception exception) { + this.close(); + //Just wrap the Exception as CamelRuntimeException throw new RuntimeCamelException(exception); } - - return answer; } public void checkNullAnswer(Message answer) { - if (answer == null && tarInputStream != null) { - IOHelper.close(tarInputStream); - tarInputStream = null; + if (answer == null) { + this.close(); } } @@ -166,10 +160,9 @@ public class TarIterator implements Iterator<Message>, Closeable { } @Override - public void close() throws IOException { - if (tarInputStream != null) { - tarInputStream.close(); - tarInputStream = null; - } + public void close() { + //suppress any exceptions from closing + IOHelper.close(tarInputStream); + tarInputStream = null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/43775d08/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java index 132dd55..3ee5371 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/dataformat/tarfile/TarSplitter.java @@ -33,7 +33,7 @@ public class TarSplitter implements Expression { public Object evaluate(Exchange exchange) { Message inputMessage = exchange.getIn(); - return new TarIterator(inputMessage, inputMessage.getBody(InputStream.class)); + return new TarIterator(exchange, inputMessage.getBody(InputStream.class)); } @Override