CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a99f6d57 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a99f6d57 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a99f6d57 Branch: refs/heads/master Commit: a99f6d5710c3068fa3fe841d4f80dc82deb1142b Parents: cfdf185 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 4 19:09:51 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 4 19:31:30 2015 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/StreamCache.java | 6 +- .../stream/ByteArrayInputStreamCache.java | 3 +- .../converter/stream/CachedOutputStream.java | 121 ++----------- .../converter/stream/FileInputStreamCache.java | 173 +++++++++++++++++-- .../converter/stream/InputStreamCache.java | 3 +- .../camel/converter/stream/ReaderCache.java | 3 +- .../camel/converter/stream/SourceCache.java | 3 +- .../converter/stream/StreamSourceCache.java | 6 +- .../camel/processor/MulticastProcessor.java | 2 +- .../camel/processor/WireTapProcessor.java | 2 +- .../processor/WireTapStreamCachingTest.java | 19 +- .../apache/camel/util/MessageHelperTest.java | 2 +- .../apache/camel/processor/twoCharacters.txt | 1 + .../http/NettyChannelBufferStreamCache.java | 3 +- 14 files changed, 207 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/StreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java b/camel-core/src/main/java/org/apache/camel/StreamCache.java index ecd9736..29f4284 100644 --- a/camel-core/src/main/java/org/apache/camel/StreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java @@ -29,7 +29,7 @@ import java.io.OutputStream; * The Camel routing engine uses the {@link org.apache.camel.processor.CamelInternalProcessor.StreamCachingAdvice} * to apply the stream cache during routing. * <p/> - * It is recommended in the {@link #copy()} method to let the copied stream start from the start. If the implementation + * It is recommended in the {@link #copy(Exchange)} method to let the copied stream start from the start. If the implementation * does not support copy, then return <tt>null</tt>. * * @version @@ -60,10 +60,12 @@ public interface StreamCache { * Implementations note: A copy of the stream is recommended to read from the start * of the stream. * + * @param exchange exchange in which the stream cache object is used; + * can be used to delete resources of the stream cache when the exchange is completed * @return a copy, or <tt>null</tt> if copy is not possible * @throws java.io.IOException is thrown if the copy fails */ - StreamCache copy() throws IOException; + StreamCache copy(Exchange exchange) throws IOException; /** * Whether this {@link StreamCache} is in memory only or http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java index 3b1dacf..9375ee3 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java @@ -22,6 +22,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.OutputStream; +import org.apache.camel.Exchange; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; @@ -51,7 +52,7 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre IOHelper.copyAndCloseInput(in, os); } - public StreamCache copy() throws IOException { + public StreamCache copy(Exchange exchange) throws IOException { if (byteArrayForCopy == null) { ByteArrayOutputStream baos = new ByteArrayOutputStream(in.available()); IOHelper.copy(in, baos); http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java index 639e339..d722baf 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java @@ -16,29 +16,15 @@ */ package org.apache.camel.converter.stream; -import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.security.GeneralSecurityException; - -import javax.crypto.CipherOutputStream; import org.apache.camel.Exchange; import org.apache.camel.StreamCache; -import org.apache.camel.converter.stream.FileInputStreamCache.FileInputStreamCloser; +import org.apache.camel.converter.stream.FileInputStreamCache.TempFileManager; import org.apache.camel.spi.StreamCachingStrategy; -import org.apache.camel.spi.Synchronization; -import org.apache.camel.spi.UnitOfWork; -import org.apache.camel.support.SynchronizationAdapter; -import org.apache.camel.util.FileUtil; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This output stream will store the content into a File if the stream context size is exceed the @@ -50,7 +36,7 @@ import org.slf4j.LoggerFactory; * <p/> * You can get a cached input stream of this stream. The temp file which is created with this * output stream will be deleted when you close this output stream or the cached - * fileInputStream(s) is/are closed after the exchange is completed. + * fileInputStream(s) is/are closed after all the exchanges using the temp file are completed. */ public class CachedOutputStream extends OutputStream { @Deprecated @@ -61,16 +47,12 @@ public class CachedOutputStream extends OutputStream { public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; @Deprecated public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation"; - private static final Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class); private final StreamCachingStrategy strategy; private OutputStream currentStream; private boolean inMemory = true; private int totalLength; - private File tempFile; - private FileInputStreamCache fileInputStreamCache; - private final FileInputStreamCloser fileInputStreamCloser = new FileInputStreamCloser(); - private CipherPair ciphers; + private final TempFileManager tempFileManager; private final boolean closedOnCompletion; public CachedOutputStream(Exchange exchange) { @@ -79,44 +61,10 @@ public class CachedOutputStream extends OutputStream { public CachedOutputStream(Exchange exchange, final boolean closedOnCompletion) { this.closedOnCompletion = closedOnCompletion; + tempFileManager = new TempFileManager(closedOnCompletion); + tempFileManager.addExchange(exchange); this.strategy = exchange.getContext().getStreamCachingStrategy(); currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize()); - if (closedOnCompletion) { - // add on completion so we can cleanup after the exchange is done such as deleting temporary files - Synchronization onCompletion = new SynchronizationAdapter() { - @Override - public void onDone(Exchange exchange) { - try { - closeFileInputStreams(); - close(); - try { - cleanUpTempFile(); - } catch (Exception e) { - LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); - } - } catch (Exception e) { - LOG.warn("Error closing streams. This exception will be ignored.", e); - } - } - - @Override - public String toString() { - return "OnCompletion[CachedOutputStream]"; - } - }; - - UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class); - if (streamCacheUnitOfWork != null) { - // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the - // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes - // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with - // the Unit of Work of the main route. - streamCacheUnitOfWork.addSynchronization(onCompletion); - } else { - // add on completion so we can cleanup after the exchange is done such as deleting temporary files - exchange.addOnCompletion(onCompletion); - } - } } public void flush() throws IOException { @@ -127,12 +75,8 @@ public class CachedOutputStream extends OutputStream { currentStream.close(); // need to clean up the temp file this time if (!closedOnCompletion) { - closeFileInputStreams(); - try { - cleanUpTempFile(); - } catch (Exception e) { - LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); - } + tempFileManager.closeFileInputStreams(); + tempFileManager.cleanUpTempFile(); } } @@ -206,40 +150,17 @@ public class CachedOutputStream extends OutputStream { throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName()); } } else { - try { - if (fileInputStreamCache == null) { - fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers, fileInputStreamCloser); - } - return fileInputStreamCache; - } catch (FileNotFoundException e) { - throw new IOException("Cached file " + tempFile + " not found", e); - } + return tempFileManager.newStreamCache(); } } - private void closeFileInputStreams() { - fileInputStreamCloser.close(); - fileInputStreamCache = null; - } - - private void cleanUpTempFile() { - // cleanup temporary file - if (tempFile != null) { - FileUtil.deleteFile(tempFile); - tempFile = null; - } - } private void pageToFileStream() throws IOException { flush(); - ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; - tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory()); - - LOG.trace("Creating temporary stream cache file: {}", tempFile); - try { - currentStream = createOutputStream(tempFile); + // creates an tmp file and a file output stream + currentStream = tempFileManager.createOutputStream(strategy); bout.writeTo(currentStream); } finally { // ensure flag is flipped to file based @@ -291,26 +212,4 @@ public class CachedOutputStream extends OutputStream { } } - private OutputStream createOutputStream(File file) throws IOException { - OutputStream out = new BufferedOutputStream(new FileOutputStream(file)); - if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) { - try { - if (ciphers == null) { - ciphers = new CipherPair(strategy.getSpoolChiper()); - } - } catch (GeneralSecurityException e) { - throw new IOException(e.getMessage(), e); - } - out = new CipherOutputStream(out, ciphers.getEncryptor()) { - boolean closed; - public void close() throws IOException { - if (!closed) { - super.close(); - closed = true; - } - } - }; - } - return out; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index c6e99ac..a0d6501 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -17,44 +17,60 @@ package org.apache.camel.converter.stream; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + import javax.crypto.CipherInputStream; +import javax.crypto.CipherOutputStream; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; +import org.apache.camel.spi.StreamCachingStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.support.SynchronizationAdapter; +import org.apache.camel.util.FileUtil; import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link StreamCache} for {@link File}s */ public final class FileInputStreamCache extends InputStream implements StreamCache { private InputStream stream; + private final long length; + private final FileInputStreamCache.TempFileManager tempFileManager; private final File file; private final CipherPair ciphers; - private final long length; - private final FileInputStreamCache.FileInputStreamCloser closer; + /** Only for testing purposes.*/ public FileInputStreamCache(File file) throws FileNotFoundException { - this(file, null, new FileInputStreamCloser()); + this(new TempFileManager(file, true)); } - FileInputStreamCache(File file, CipherPair ciphers, FileInputStreamCloser closer) throws FileNotFoundException { - this.file = file; + FileInputStreamCache(TempFileManager closer) throws FileNotFoundException { + this.file = closer.getTempFile(); this.stream = null; - this.ciphers = ciphers; + this.ciphers = closer.getCiphers(); this.length = file.length(); - this.closer = closer; - this.closer.add(this); + this.tempFileManager = closer; + this.tempFileManager.add(this); } @Override @@ -99,8 +115,9 @@ public final class FileInputStreamCache extends InputStream implements StreamCac } } - public StreamCache copy() throws IOException { - FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, closer); + public StreamCache copy(Exchange exchange) throws IOException { + tempFileManager.addExchange(exchange); + FileInputStreamCache copy = new FileInputStreamCache(tempFileManager); return copy; } @@ -146,16 +163,37 @@ public final class FileInputStreamCache extends InputStream implements StreamCac } /** - * Collects all FileInputStreamCache instances of a temporary file which must be closed - * at the end of the route. + * Manages the temporary file for the file input stream caches. + * + * Collects all FileInputStreamCache instances of the temporary file. + * Counts the number of exchanges which have a FileInputStreamCache instance of the temporary file. + * Deletes the temporary file, if all exchanges are done. * * @see CachedOutputStream */ - static class FileInputStreamCloser { + static class TempFileManager { + + private static final Logger LOG = LoggerFactory.getLogger(TempFileManager.class); + /** Indicator whether the file input stream caches are closed on completion of the exchanges. */ + private final boolean closedOnCompletion; + private AtomicInteger exchangeCounter = new AtomicInteger(); + private File tempFile; + private OutputStream outputStream; // file output stream + private CipherPair ciphers; - // there can be several input streams, for example in the multi-cast parallel processing + // there can be several input streams, for example in the multi-cast, or wiretap parallel processing private List<FileInputStreamCache> fileInputStreamCaches; + + /** Only for testing.*/ + private TempFileManager(File file, boolean closedOnCompletion) { + this(closedOnCompletion); + this.tempFile = file; + } + TempFileManager(boolean closedOnCompletion) { + this.closedOnCompletion = closedOnCompletion; + } + /** Adds a FileInputStreamCache instance to the closer. * <p> * Must be synchronized, because can be accessed by several threads. @@ -167,14 +205,119 @@ public final class FileInputStreamCache extends InputStream implements StreamCac fileInputStreamCaches.add(fileInputStreamCache); } - void close() { + void addExchange(Exchange exchange) { + if (closedOnCompletion) { + exchangeCounter.incrementAndGet(); + // add on completion so we can cleanup after the exchange is done such as deleting temporary files + Synchronization onCompletion = new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + int actualExchanges = exchangeCounter.decrementAndGet(); + if (actualExchanges == 0) { + // only one exchange (one thread) left, therefore we must not synchronize the following lines of code + try { + closeFileInputStreams(); + if (outputStream != null) { + outputStream.close(); + } + try { + cleanUpTempFile(); + } catch (Exception e) { + LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); + } + } catch (Exception e) { + LOG.warn("Error closing streams. This exception will be ignored.", e); + } + } + } + + @Override + public String toString() { + return "OnCompletion[CachedOutputStream]"; + } + }; + UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class); + if (streamCacheUnitOfWork != null) { + // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the + // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes + // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with + // the Unit of Work of the main route. + streamCacheUnitOfWork.addSynchronization(onCompletion); + } else { + // add on completion so we can cleanup after the exchange is done such as deleting temporary files + exchange.addOnCompletion(onCompletion); + } + } + } + + OutputStream createOutputStream(StreamCachingStrategy strategy) throws IOException { + // should only be called once + if (tempFile != null) { + throw new IllegalStateException("The method 'createOutputStream' can only be called once!"); + } + tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory()); + + LOG.trace("Creating temporary stream cache file: {}", tempFile); + OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFile)); + if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) { + try { + if (ciphers == null) { + ciphers = new CipherPair(strategy.getSpoolChiper()); + } + } catch (GeneralSecurityException e) { + throw new IOException(e.getMessage(), e); + } + out = new CipherOutputStream(out, ciphers.getEncryptor()) { + boolean closed; + public void close() throws IOException { + if (!closed) { + super.close(); + closed = true; + } + } + }; + } + outputStream = out; + return out; + } + + FileInputStreamCache newStreamCache() throws IOException { + try { + return new FileInputStreamCache(this); + } catch (FileNotFoundException e) { + throw new IOException("Cached file " + tempFile + " not found", e); + } + } + + void closeFileInputStreams() { if (fileInputStreamCaches != null) { for (FileInputStreamCache fileInputStreamCache : fileInputStreamCaches) { fileInputStreamCache.close(); } fileInputStreamCaches.clear(); } + } + + void cleanUpTempFile() { + // cleanup temporary file + try { + if (tempFile != null) { + FileUtil.deleteFile(tempFile); + tempFile = null; + } + } catch (Exception e) { + LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); + } + } + + File getTempFile() { + return tempFile; + } + + CipherPair getCiphers() { + return ciphers; } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java index ba7f12e..78422a7 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; +import org.apache.camel.Exchange; import org.apache.camel.StreamCache; /** @@ -40,7 +41,7 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre os.write(buf, pos, count - pos); } - public StreamCache copy() { + public StreamCache copy(Exchange exchange) { return new InputStreamCache(buf, count); } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java index bed761c..2890945 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.io.StringReader; +import org.apache.camel.Exchange; import org.apache.camel.StreamCache; /** @@ -51,7 +52,7 @@ public class ReaderCache extends StringReader implements StreamCache { os.write(data.getBytes()); } - public StreamCache copy() throws IOException { + public StreamCache copy(Exchange exchange) throws IOException { return new ReaderCache(data); } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java index 16f8422..4f00eb4 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java @@ -19,6 +19,7 @@ package org.apache.camel.converter.stream; import java.io.IOException; import java.io.OutputStream; +import org.apache.camel.Exchange; import org.apache.camel.StreamCache; import org.apache.camel.StringSource; import org.apache.camel.util.IOHelper; @@ -44,7 +45,7 @@ public final class SourceCache extends StringSource implements StreamCache { IOHelper.copy(getInputStream(), os); } - public StreamCache copy() throws IOException { + public StreamCache copy(Exchange exchange) throws IOException { return new SourceCache(getText()); } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java index 499f799..a7edfc9 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java @@ -83,12 +83,12 @@ public final class StreamSourceCache extends StreamSource implements StreamCache } } - public StreamCache copy() throws IOException { + public StreamCache copy(Exchange exchange) throws IOException { if (streamCache != null) { - return new StreamSourceCache(streamCache.copy()); + return new StreamSourceCache(streamCache.copy(exchange)); } if (readCache != null) { - return new StreamSourceCache(readCache.copy()); + return new StreamSourceCache(readCache.copy(exchange)); } return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 334ceb1..e4a2ef8 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -953,7 +953,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor if (index > 0) { // copy it otherwise parallel processing is not possible, // because streams can only be read once - StreamCache copiedStreamCache = streamCache.copy(); + StreamCache copiedStreamCache = streamCache.copy(copy); if (copiedStreamCache != null) { copy.getIn().setBody(copiedStreamCache); } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index a74e663..1d6b835 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -180,7 +180,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, if (msg.getBody() instanceof StreamCache) { // in parallel processing case, the stream must be copied, therefore get the stream StreamCache cache = (StreamCache) msg.getBody(); - StreamCache copied = cache.copy(); + StreamCache copied = cache.copy(answer); if (copied != null) { msg.setBody(copied); } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java index 0a87c13..1db7307 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.io.StringReader; + import javax.xml.transform.stream.StreamSource; import org.apache.camel.ContextTestSupport; @@ -26,6 +27,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; /** * @version @@ -51,7 +53,19 @@ public class WireTapStreamCachingTest extends ContextTestSupport { assertMockEndpointsSatisfied(); } + + @Test + public void testSendingAMessageUsingWiretapShouldNotDeleteStreamFileBeforeAllExcangesAreComplete() throws InterruptedException { + + x.expectedMessageCount(1); + y.expectedMessageCount(1); + z.expectedMessageCount(1); + + // the used file should contain more than one character in order to be streamed into the file system + template.sendBody("direct:a", this.getClass().getClassLoader().getResourceAsStream("org/apache/camel/processor/twoCharacters.txt")); + assertMockEndpointsSatisfied(); + } @Override protected void setUp() throws Exception { @@ -76,6 +90,8 @@ public class WireTapStreamCachingTest extends ContextTestSupport { public void configure() { // enable stream caching context.setStreamCaching(true); + // set stream threshold to 1, in order to stream into the file system + context.getStreamCachingStrategy().setSpoolThreshold(1); errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(3)); @@ -83,7 +99,8 @@ public class WireTapStreamCachingTest extends ContextTestSupport { from("direct:a").wireTap("direct:x").wireTap("direct:y").wireTap("direct:z"); from("direct:x").process(processor).to("mock:x"); - from("direct:y").process(processor).to("mock:y"); + // even if a process takes more time then the others the wire tap shall work + from("direct:y").delay(2000).process(processor).to("mock:y"); from("direct:z").process(processor).to("mock:z"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java index 7448989..cf80614 100644 --- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java +++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java @@ -63,7 +63,7 @@ public class MessageHelperTest extends TestCase { // noop } - public StreamCache copy() throws IOException { + public StreamCache copy(Exchange exchange) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt ---------------------------------------------------------------------- diff --git a/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt new file mode 100644 index 0000000..dfc9179 --- /dev/null +++ b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt @@ -0,0 +1 @@ +AB \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java index 8ecb8f7..b3afc4a 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.camel.Exchange; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; import org.jboss.netty.buffer.ChannelBuffer; @@ -87,7 +88,7 @@ public final class NettyChannelBufferStreamCache extends InputStream implements } @Override - public StreamCache copy() throws IOException { + public StreamCache copy(Exchange exchange) throws IOException { return new NettyChannelBufferStreamCache(buffer.copy()); }