CAMEL-8688: Stream cache now keeps track on number of copies that was spooled 
to disk, so when the temp file is deleted its only deleted when no longer in 
use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for 
the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a99f6d57
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a99f6d57
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a99f6d57

Branch: refs/heads/master
Commit: a99f6d5710c3068fa3fe841d4f80dc82deb1142b
Parents: cfdf185
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon May 4 19:09:51 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/StreamCache.java |   6 +-
 .../stream/ByteArrayInputStreamCache.java       |   3 +-
 .../converter/stream/CachedOutputStream.java    | 121 ++-----------
 .../converter/stream/FileInputStreamCache.java  | 173 +++++++++++++++++--
 .../converter/stream/InputStreamCache.java      |   3 +-
 .../camel/converter/stream/ReaderCache.java     |   3 +-
 .../camel/converter/stream/SourceCache.java     |   3 +-
 .../converter/stream/StreamSourceCache.java     |   6 +-
 .../camel/processor/MulticastProcessor.java     |   2 +-
 .../camel/processor/WireTapProcessor.java       |   2 +-
 .../processor/WireTapStreamCachingTest.java     |  19 +-
 .../apache/camel/util/MessageHelperTest.java    |   2 +-
 .../apache/camel/processor/twoCharacters.txt    |   1 +
 .../http/NettyChannelBufferStreamCache.java     |   3 +-
 14 files changed, 207 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/StreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java 
b/camel-core/src/main/java/org/apache/camel/StreamCache.java
index ecd9736..29f4284 100644
--- a/camel-core/src/main/java/org/apache/camel/StreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java
@@ -29,7 +29,7 @@ import java.io.OutputStream;
  * The Camel routing engine uses the {@link 
org.apache.camel.processor.CamelInternalProcessor.StreamCachingAdvice}
  * to apply the stream cache during routing.
  * <p/>
- * It is recommended in the {@link #copy()} method to let the copied stream 
start from the start. If the implementation
+ * It is recommended in the {@link #copy(Exchange)} method to let the copied 
stream start from the start. If the implementation
  * does not support copy, then return <tt>null</tt>.
  *
  * @version 
@@ -60,10 +60,12 @@ public interface StreamCache {
      * Implementations note: A copy of the stream is recommended to read from 
the start
      * of the stream.
      *
+     * @param exchange exchange in which the stream cache object is used; 
+     *                 can be used to delete resources of the stream cache 
when the exchange is completed
      * @return a copy, or <tt>null</tt> if copy is not possible
      * @throws java.io.IOException is thrown if the copy fails
      */
-    StreamCache copy() throws IOException;
+    StreamCache copy(Exchange exchange) throws IOException;
 
     /**
      * Whether this {@link StreamCache} is in memory only or

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
index 3b1dacf..9375ee3 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -22,6 +22,7 @@ import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
@@ -51,7 +52,7 @@ public class ByteArrayInputStreamCache extends 
FilterInputStream implements Stre
         IOHelper.copyAndCloseInput(in, os);
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         if (byteArrayForCopy == null) {
             ByteArrayOutputStream baos = new 
ByteArrayOutputStream(in.available());
             IOHelper.copy(in, baos);

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 639e339..d722baf 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -16,29 +16,15 @@
  */
 package org.apache.camel.converter.stream;
 
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.security.GeneralSecurityException;
-
-import javax.crypto.CipherOutputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
-import 
org.apache.camel.converter.stream.FileInputStreamCache.FileInputStreamCloser;
+import org.apache.camel.converter.stream.FileInputStreamCache.TempFileManager;
 import org.apache.camel.spi.StreamCachingStrategy;
-import org.apache.camel.spi.Synchronization;
-import org.apache.camel.spi.UnitOfWork;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.FileUtil;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This output stream will store the content into a File if the stream context 
size is exceed the
@@ -50,7 +36,7 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * You can get a cached input stream of this stream. The temp file which is 
created with this 
  * output stream will be deleted when you close this output stream or the 
cached 
- * fileInputStream(s) is/are closed after the exchange is completed.
+ * fileInputStream(s) is/are closed after all the exchanges using the temp 
file are completed.
  */
 public class CachedOutputStream extends OutputStream {
     @Deprecated
@@ -61,16 +47,12 @@ public class CachedOutputStream extends OutputStream {
     public static final String TEMP_DIR = 
"CamelCachedOutputStreamOutputDirectory";
     @Deprecated
     public static final String CIPHER_TRANSFORMATION = 
"CamelCachedOutputStreamCipherTransformation";
-    private static final Logger LOG = 
LoggerFactory.getLogger(CachedOutputStream.class);
 
     private final StreamCachingStrategy strategy;
     private OutputStream currentStream;
     private boolean inMemory = true;
     private int totalLength;
-    private File tempFile;
-    private FileInputStreamCache fileInputStreamCache;
-    private final FileInputStreamCloser fileInputStreamCloser = new 
FileInputStreamCloser();
-    private CipherPair ciphers;
+    private final TempFileManager tempFileManager;
     private final boolean closedOnCompletion;
 
     public CachedOutputStream(Exchange exchange) {
@@ -79,44 +61,10 @@ public class CachedOutputStream extends OutputStream {
 
     public CachedOutputStream(Exchange exchange, final boolean 
closedOnCompletion) {
         this.closedOnCompletion = closedOnCompletion;
+        tempFileManager = new TempFileManager(closedOnCompletion);
+        tempFileManager.addExchange(exchange);
         this.strategy = exchange.getContext().getStreamCachingStrategy();
         currentStream = new 
CachedByteArrayOutputStream(strategy.getBufferSize());
-        if (closedOnCompletion) {
-            // add on completion so we can cleanup after the exchange is done 
such as deleting temporary files
-            Synchronization onCompletion = new SynchronizationAdapter() {
-                @Override
-                public void onDone(Exchange exchange) {
-                    try {
-                        closeFileInputStreams();
-                        close();
-                        try {
-                            cleanUpTempFile();
-                        } catch (Exception e) {
-                            LOG.warn("Error deleting temporary cache file: " + 
tempFile + ". This exception will be ignored.", e);
-                        }
-                    } catch (Exception e) {
-                        LOG.warn("Error closing streams. This exception will 
be ignored.", e);
-                    }
-                }
-
-                @Override
-                public String toString() {
-                    return "OnCompletion[CachedOutputStream]";
-                }
-            };
-
-            UnitOfWork streamCacheUnitOfWork = 
exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
-            if (streamCacheUnitOfWork != null) {
-                // The stream cache must sometimes not be closed when the 
exchange is deleted. This is for example the
-                // case in the splitter and multi-cast case with 
AggregationStrategy where the result of the sub-routes
-                // are aggregated later in the main route. Here, the cached 
streams of the sub-routes must be closed with
-                // the Unit of Work of the main route.
-                streamCacheUnitOfWork.addSynchronization(onCompletion);
-            } else {
-                // add on completion so we can cleanup after the exchange is 
done such as deleting temporary files
-                exchange.addOnCompletion(onCompletion);
-            }
-        }
     }
 
     public void flush() throws IOException {
@@ -127,12 +75,8 @@ public class CachedOutputStream extends OutputStream {
         currentStream.close();
         // need to clean up the temp file this time
         if (!closedOnCompletion) {
-            closeFileInputStreams();
-            try {
-                cleanUpTempFile();
-            } catch (Exception e) {
-                LOG.warn("Error deleting temporary cache file: " + tempFile + 
". This exception will be ignored.", e);
-            }
+            tempFileManager.closeFileInputStreams();
+            tempFileManager.cleanUpTempFile();
         }
     }
 
@@ -206,40 +150,17 @@ public class CachedOutputStream extends OutputStream {
                 throw new IllegalStateException("CurrentStream should be an 
instance of CachedByteArrayOutputStream but is: " + 
currentStream.getClass().getName());
             }
         } else {
-            try {
-                if (fileInputStreamCache == null) {
-                    fileInputStreamCache = new FileInputStreamCache(tempFile, 
ciphers, fileInputStreamCloser);
-                }
-                return fileInputStreamCache;
-            } catch (FileNotFoundException e) {
-                throw new IOException("Cached file " + tempFile + " not 
found", e);
-            }
+            return tempFileManager.newStreamCache();
         }
     }
     
-    private void closeFileInputStreams() {
-        fileInputStreamCloser.close();
-        fileInputStreamCache = null;
-    } 
-
-    private void cleanUpTempFile() {
-        // cleanup temporary file
-        if (tempFile != null) {
-            FileUtil.deleteFile(tempFile);
-            tempFile = null;
-        }
-    }
 
     private void pageToFileStream() throws IOException {
         flush();
-
         ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
-        tempFile = FileUtil.createTempFile("cos", ".tmp", 
strategy.getSpoolDirectory());
-
-        LOG.trace("Creating temporary stream cache file: {}", tempFile);
-
         try {
-            currentStream = createOutputStream(tempFile);
+            // creates an tmp file and a file output stream
+            currentStream = tempFileManager.createOutputStream(strategy);
             bout.writeTo(currentStream);
         } finally {
             // ensure flag is flipped to file based
@@ -291,26 +212,4 @@ public class CachedOutputStream extends OutputStream {
         }
     }
 
-    private OutputStream createOutputStream(File file) throws IOException {
-        OutputStream out = new BufferedOutputStream(new 
FileOutputStream(file));
-        if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
-            try {
-                if (ciphers == null) {
-                    ciphers = new CipherPair(strategy.getSpoolChiper());
-                }
-            } catch (GeneralSecurityException e) {
-                throw new IOException(e.getMessage(), e);
-            }
-            out = new CipherOutputStream(out, ciphers.getEncryptor()) {
-                boolean closed;
-                public void close() throws IOException {
-                    if (!closed) {
-                        super.close();
-                        closed = true;
-                    }
-                }
-            };
-        }
-        return out;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index c6e99ac..a0d6501 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -17,44 +17,60 @@
 package org.apache.camel.converter.stream;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link StreamCache} for {@link File}s
  */
 public final class FileInputStreamCache extends InputStream implements 
StreamCache {
     private InputStream stream;
+    private final long length;
+    private final FileInputStreamCache.TempFileManager tempFileManager;
     private final File file;
     private final CipherPair ciphers;
-    private final long length;
-    private final FileInputStreamCache.FileInputStreamCloser closer;
 
+    /** Only for testing purposes.*/
     public FileInputStreamCache(File file) throws FileNotFoundException {
-        this(file, null, new FileInputStreamCloser());
+        this(new TempFileManager(file, true));
     }
     
-    FileInputStreamCache(File file, CipherPair ciphers, FileInputStreamCloser 
closer) throws FileNotFoundException {
-        this.file = file;
+    FileInputStreamCache(TempFileManager closer) throws FileNotFoundException {
+        this.file = closer.getTempFile();
         this.stream = null;
-        this.ciphers = ciphers;
+        this.ciphers = closer.getCiphers();
         this.length = file.length();
-        this.closer = closer;
-        this.closer.add(this);
+        this.tempFileManager = closer;
+        this.tempFileManager.add(this);
     }
     
     @Override
@@ -99,8 +115,9 @@ public final class FileInputStreamCache extends InputStream 
implements StreamCac
         }
     }
 
-    public StreamCache copy() throws IOException {
-        FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, 
closer);
+    public StreamCache copy(Exchange exchange) throws IOException {
+        tempFileManager.addExchange(exchange);
+        FileInputStreamCache copy = new FileInputStreamCache(tempFileManager);
         return copy;
     }
 
@@ -146,16 +163,37 @@ public final class FileInputStreamCache extends 
InputStream implements StreamCac
     }
 
     /** 
-     * Collects all FileInputStreamCache instances of a temporary file which 
must be closed
-     * at the end of the route.
+     * Manages the temporary file for the file input stream caches.
+     * 
+     * Collects all FileInputStreamCache instances of the temporary file.
+     * Counts the number of exchanges which have a FileInputStreamCache  
instance of the temporary file.
+     * Deletes the temporary file, if all exchanges are done.
      * 
      * @see CachedOutputStream
      */
-    static class FileInputStreamCloser {
+    static class TempFileManager {
+        
+        private static final Logger LOG = 
LoggerFactory.getLogger(TempFileManager.class);
+        /** Indicator whether the file input stream caches are closed on 
completion of the exchanges. */
+        private final boolean closedOnCompletion;
+        private AtomicInteger exchangeCounter = new AtomicInteger();
+        private File tempFile;
+        private OutputStream outputStream; // file output stream
+        private CipherPair ciphers;
         
-        // there can be several input streams, for example in the multi-cast 
parallel processing
+        // there can be several input streams, for example in the multi-cast, 
or wiretap parallel processing
         private List<FileInputStreamCache> fileInputStreamCaches;
+
+        /** Only for testing.*/
+        private TempFileManager(File file, boolean closedOnCompletion) {
+            this(closedOnCompletion);
+            this.tempFile = file;
+        }
         
+        TempFileManager(boolean closedOnCompletion) {
+            this.closedOnCompletion = closedOnCompletion;
+        }
+                
         /** Adds a FileInputStreamCache instance to the closer.
          * <p>
          * Must be synchronized, because can be accessed by several threads. 
@@ -167,14 +205,119 @@ public final class FileInputStreamCache extends 
InputStream implements StreamCac
             fileInputStreamCaches.add(fileInputStreamCache);
         }
         
-        void close() {
+        void addExchange(Exchange exchange) {
+            if (closedOnCompletion) {
+                exchangeCounter.incrementAndGet();
+                // add on completion so we can cleanup after the exchange is 
done such as deleting temporary files
+                Synchronization onCompletion = new SynchronizationAdapter() {
+                    @Override
+                    public void onDone(Exchange exchange) {
+                        int actualExchanges = 
exchangeCounter.decrementAndGet();
+                        if (actualExchanges == 0) {
+                            // only one exchange (one thread) left, therefore 
we must not synchronize the following lines of code
+                            try {                              
+                                closeFileInputStreams();
+                                if (outputStream != null) {
+                                    outputStream.close();
+                                }
+                                try {
+                                    cleanUpTempFile();
+                                } catch (Exception e) {
+                                    LOG.warn("Error deleting temporary cache 
file: " + tempFile + ". This exception will be ignored.", e);
+                                }
+                            } catch (Exception e) {
+                                LOG.warn("Error closing streams. This 
exception will be ignored.", e);
+                            }
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "OnCompletion[CachedOutputStream]";
+                    }
+                };
+                UnitOfWork streamCacheUnitOfWork = 
exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
+                if (streamCacheUnitOfWork != null) {
+                    // The stream cache must sometimes not be closed when the 
exchange is deleted. This is for example the
+                    // case in the splitter and multi-cast case with 
AggregationStrategy where the result of the sub-routes
+                    // are aggregated later in the main route. Here, the 
cached streams of the sub-routes must be closed with
+                    // the Unit of Work of the main route.
+                    streamCacheUnitOfWork.addSynchronization(onCompletion);
+                } else {
+                    // add on completion so we can cleanup after the exchange 
is done such as deleting temporary files
+                    exchange.addOnCompletion(onCompletion);
+                }
+            }
+        }
+        
+        OutputStream createOutputStream(StreamCachingStrategy strategy) throws 
IOException {
+            // should only be called once
+            if (tempFile != null) {
+                throw new IllegalStateException("The method 
'createOutputStream' can only be called once!");
+            }
+            tempFile = FileUtil.createTempFile("cos", ".tmp", 
strategy.getSpoolDirectory());
+
+            LOG.trace("Creating temporary stream cache file: {}", tempFile);
+            OutputStream out = new BufferedOutputStream(new 
FileOutputStream(tempFile));
+            if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
+                try {
+                    if (ciphers == null) {
+                        ciphers = new CipherPair(strategy.getSpoolChiper());
+                    }
+                } catch (GeneralSecurityException e) {
+                    throw new IOException(e.getMessage(), e);
+                }
+                out = new CipherOutputStream(out, ciphers.getEncryptor()) {
+                    boolean closed;
+                    public void close() throws IOException {
+                        if (!closed) {
+                            super.close();
+                            closed = true;
+                        }
+                    }
+                };
+            }
+            outputStream = out;
+            return out;
+        }
+        
+        FileInputStreamCache newStreamCache() throws IOException {
+            try {
+                return new FileInputStreamCache(this);
+            } catch (FileNotFoundException e) {
+                throw new IOException("Cached file " + tempFile + " not 
found", e);
+            }
+        }
+        
+        void closeFileInputStreams() {
             if (fileInputStreamCaches != null) {
                 for (FileInputStreamCache fileInputStreamCache : 
fileInputStreamCaches) {
                     fileInputStreamCache.close();
                 }
                 fileInputStreamCaches.clear();
             }
+        } 
+
+        void cleanUpTempFile() {
+            // cleanup temporary file
+            try {
+                if (tempFile != null) {
+                    FileUtil.deleteFile(tempFile);
+                    tempFile = null;
+                }
+            } catch (Exception e) {
+                LOG.warn("Error deleting temporary cache file: " + tempFile + 
". This exception will be ignored.", e);
+            }
+        }
+        
+        File getTempFile() {
+            return tempFile;
+        }
+        
+        CipherPair getCiphers() {
+            return ciphers;
         }
+        
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index ba7f12e..78422a7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 
 /**
@@ -40,7 +41,7 @@ public final class InputStreamCache extends 
ByteArrayInputStream implements Stre
         os.write(buf, pos, count - pos);
     }
 
-    public StreamCache copy() {
+    public StreamCache copy(Exchange exchange) {
         return new InputStreamCache(buf, count);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index bed761c..2890945 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.StringReader;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 
 /**
@@ -51,7 +52,7 @@ public class ReaderCache extends StringReader implements 
StreamCache {
         os.write(data.getBytes());
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new ReaderCache(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java 
b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 16f8422..4f00eb4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -19,6 +19,7 @@ package org.apache.camel.converter.stream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.StringSource;
 import org.apache.camel.util.IOHelper;
@@ -44,7 +45,7 @@ public final class SourceCache extends StringSource 
implements StreamCache {
         IOHelper.copy(getInputStream(), os);
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new SourceCache(getText());
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index 499f799..a7edfc9 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -83,12 +83,12 @@ public final class StreamSourceCache extends StreamSource 
implements StreamCache
         }
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         if (streamCache != null) {
-            return new StreamSourceCache(streamCache.copy());
+            return new StreamSourceCache(streamCache.copy(exchange));
         }
         if (readCache != null) {
-            return new StreamSourceCache(readCache.copy());
+            return new StreamSourceCache(readCache.copy(exchange));
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 334ceb1..e4a2ef8 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -953,7 +953,7 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
                 if (index > 0) {
                     // copy it otherwise parallel processing is not possible,
                     // because streams can only be read once
-                    StreamCache copiedStreamCache = streamCache.copy();
+                    StreamCache copiedStreamCache = streamCache.copy(copy);
                     if (copiedStreamCache != null) {
                         copy.getIn().setBody(copiedStreamCache);  
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index a74e663..1d6b835 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -180,7 +180,7 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
         if (msg.getBody() instanceof StreamCache) {
             // in parallel processing case, the stream must be copied, 
therefore get the stream
             StreamCache cache = (StreamCache) msg.getBody();
-            StreamCache copied = cache.copy();
+            StreamCache copied = cache.copy(answer);
             if (copied != null) {
                 msg.setBody(copied);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
index 0a87c13..1db7307 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.io.StringReader;
+
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.ContextTestSupport;
@@ -26,6 +27,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
 
 /**
  * @version 
@@ -51,7 +53,19 @@ public class WireTapStreamCachingTest extends 
ContextTestSupport {
 
         assertMockEndpointsSatisfied();
     }
+    
+    @Test
+    public void 
testSendingAMessageUsingWiretapShouldNotDeleteStreamFileBeforeAllExcangesAreComplete()
 throws InterruptedException {
+
+        x.expectedMessageCount(1);
+        y.expectedMessageCount(1);
+        z.expectedMessageCount(1);
+
+        // the used file should contain more than one character in order to be 
streamed into the file system
+        template.sendBody("direct:a", 
this.getClass().getClassLoader().getResourceAsStream("org/apache/camel/processor/twoCharacters.txt"));
 
+        assertMockEndpointsSatisfied();
+    }
 
     @Override
     protected void setUp() throws Exception {
@@ -76,6 +90,8 @@ public class WireTapStreamCachingTest extends 
ContextTestSupport {
             public void configure() {
                 // enable stream caching
                 context.setStreamCaching(true);
+                // set stream threshold to 1, in order to stream into the file 
system
+                context.getStreamCachingStrategy().setSpoolThreshold(1);
 
                 
errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(3));
 
@@ -83,7 +99,8 @@ public class WireTapStreamCachingTest extends 
ContextTestSupport {
                 
from("direct:a").wireTap("direct:x").wireTap("direct:y").wireTap("direct:z");
 
                 from("direct:x").process(processor).to("mock:x");
-                from("direct:y").process(processor).to("mock:y");
+                // even if a process takes more time then the others the wire 
tap shall work
+                from("direct:y").delay(2000).process(processor).to("mock:y");
                 from("direct:z").process(processor).to("mock:z");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java 
b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
index 7448989..cf80614 100644
--- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
+++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
@@ -63,7 +63,7 @@ public class MessageHelperTest extends TestCase {
                 // noop
             }
 
-            public StreamCache copy() throws IOException {
+            public StreamCache copy(Exchange exchange) throws IOException {
                 return null;
             }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt 
b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
new file mode 100644
index 0000000..dfc9179
--- /dev/null
+++ b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
@@ -0,0 +1 @@
+AB
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
index 8ecb8f7..b3afc4a 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -87,7 +88,7 @@ public final class NettyChannelBufferStreamCache extends 
InputStream implements
     }
 
     @Override
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new NettyChannelBufferStreamCache(buffer.copy());
     }
 

Reply via email to