Repository: camel
Updated Branches:
  refs/heads/master a09587b3b -> 4d827f208


CAMEL-8284: MultiCast in Parallel Processing Mode with StreamCache leads to 
wrong results. Thanks to Franz Forsthofer for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d827f20
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d827f20
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d827f20

Branch: refs/heads/master
Commit: 4d827f2087fd44e37fea3ca98c701def8bfa3e6e
Parents: a09587b
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Feb 18 15:54:46 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 18 15:54:46 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/ParallelProcessableStream.java |  46 ++++
 .../stream/ByteArrayInputStreamCache.java       |  24 +-
 .../converter/stream/CachedOutputStream.java    |  42 ++--
 .../converter/stream/FileInputStreamCache.java  |  57 ++++-
 .../converter/stream/InputStreamCache.java      |  16 +-
 .../camel/converter/stream/ReaderCache.java     |   8 +-
 .../camel/converter/stream/SourceCache.java     |   2 +
 .../converter/stream/StreamSourceCache.java     |  46 ++--
 .../camel/processor/MulticastProcessor.java     |  18 ++
 .../MultiCastParallelAndStreamCachingTest.java  | 246 +++++++++++++++++++
 .../org/apache/camel/processor/oneCharacter.txt |   1 +
 11 files changed, 451 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java 
b/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java
new file mode 100644
index 0000000..7f03967
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.io.IOException;
+
+/**
+ * Tagging interface to indicate that a stream can be used in parallel
+ * processing by offering a copy method.
+ * <p/>
+ * This is a useful feature for avoiding message re-readability issues which 
can
+ * occur when the same message is processed by several threads. This interface
+ * is mainly used by the {@link org.apache.camel.processor.MulticastProcessor}
+ * and should be implemented by all implementers of
+ * {@link org.apache.camel.StreamCache}
+ * 
+ * @version
+ */
+public interface ParallelProcessableStream {
+
+    /**
+     * Create a copy of the stream. If possible use the same cached data in the
+     * copied instance.
+     * <p>
+     * This method is useful for parallel processing.
+     * 
+     * @throws java.io.IOException
+     *             if the copy fails
+     */
+    ParallelProcessableStream copy() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
index ba91472..d72968f 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -17,19 +17,23 @@
 package org.apache.camel.converter.stream;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.ParallelProcessableStream;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
 /**
  * A {@link StreamCache} for {@link java.io.ByteArrayInputStream}
  */
-public class ByteArrayInputStreamCache extends FilterInputStream implements 
StreamCache {
+public class ByteArrayInputStreamCache extends FilterInputStream implements 
StreamCache, ParallelProcessableStream {
 
     private final int length;
+    
+    private byte[] byteArrayForCopy;
 
     public ByteArrayInputStreamCache(ByteArrayInputStream in) {
         super(in);
@@ -57,4 +61,22 @@ public class ByteArrayInputStreamCache extends 
FilterInputStream implements Stre
     public long length() {
         return length;
     }
+
+    /**
+     * Transforms to InputStreamCache by copying the byte array. An
+     * InputStreamCache can be copied in such a way that the underlying byte
+     * array is kept.
+     */
+    @Override
+    public ParallelProcessableStream copy() throws IOException {
+        if (byteArrayForCopy == null) {
+            ByteArrayOutputStream baos = new 
ByteArrayOutputStream(in.available());
+            IOHelper.copy(this, baos);
+            // reset so that the stream can be reused
+            reset();
+            // cache the byte array, in order not to copy the byte array in 
the next call again
+            byteArrayForCopy = baos.toByteArray();
+        }
+        return new InputStreamCache(byteArrayForCopy);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 616b2ed..639e339 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -30,6 +30,7 @@ import javax.crypto.CipherOutputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
+import 
org.apache.camel.converter.stream.FileInputStreamCache.FileInputStreamCloser;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
@@ -48,8 +49,8 @@ import org.slf4j.LoggerFactory;
  * system property of "java.io.tmpdir".
  * <p/>
  * You can get a cached input stream of this stream. The temp file which is 
created with this 
- * output stream will be deleted when you close this output stream or the all 
cached 
- * fileInputStream is closed after the exchange is completed.
+ * output stream will be deleted when you close this output stream or the 
cached 
+ * fileInputStream(s) is/are closed after the exchange is completed.
  */
 public class CachedOutputStream extends OutputStream {
     @Deprecated
@@ -68,6 +69,7 @@ public class CachedOutputStream extends OutputStream {
     private int totalLength;
     private File tempFile;
     private FileInputStreamCache fileInputStreamCache;
+    private final FileInputStreamCloser fileInputStreamCloser = new 
FileInputStreamCloser();
     private CipherPair ciphers;
     private final boolean closedOnCompletion;
 
@@ -85,9 +87,7 @@ public class CachedOutputStream extends OutputStream {
                 @Override
                 public void onDone(Exchange exchange) {
                     try {
-                        if (fileInputStreamCache != null) {
-                            fileInputStreamCache.close();
-                        }
+                        closeFileInputStreams();
                         close();
                         try {
                             cleanUpTempFile();
@@ -127,9 +127,7 @@ public class CachedOutputStream extends OutputStream {
         currentStream.close();
         // need to clean up the temp file this time
         if (!closedOnCompletion) {
-            if (fileInputStreamCache != null) {
-                fileInputStreamCache.close();
-            }
+            closeFileInputStreams();
             try {
                 cleanUpTempFile();
             } catch (Exception e) {
@@ -179,29 +177,12 @@ public class CachedOutputStream extends OutputStream {
     }
 
     public InputStream getInputStream() throws IOException {
-        flush();
-
-        if (inMemory) {
-            if (currentStream instanceof CachedByteArrayOutputStream) {
-                return ((CachedByteArrayOutputStream) 
currentStream).newInputStreamCache();
-            } else {
-                throw new IllegalStateException("CurrentStream should be an 
instance of CachedByteArrayOutputStream but is: " + 
currentStream.getClass().getName());
-            }
-        } else {
-            try {
-                if (fileInputStreamCache == null) {
-                    fileInputStreamCache = new FileInputStreamCache(tempFile, 
ciphers);
-                }
-                return fileInputStreamCache;
-            } catch (FileNotFoundException e) {
-                throw new IOException("Cached file " + tempFile + " not 
found", e);
-            }
-        }
+        return (InputStream)newStreamCache();
     }    
 
     public InputStream getWrappedInputStream() throws IOException {
         // The WrappedInputStream will close the CachedOutputStream when it is 
closed
-        return new WrappedInputStream(this, getInputStream());
+        return new WrappedInputStream(this, (InputStream)newStreamCache());
     }
 
     /**
@@ -227,7 +208,7 @@ public class CachedOutputStream extends OutputStream {
         } else {
             try {
                 if (fileInputStreamCache == null) {
-                    fileInputStreamCache = new FileInputStreamCache(tempFile, 
ciphers);
+                    fileInputStreamCache = new FileInputStreamCache(tempFile, 
ciphers, fileInputStreamCloser);
                 }
                 return fileInputStreamCache;
             } catch (FileNotFoundException e) {
@@ -235,6 +216,11 @@ public class CachedOutputStream extends OutputStream {
             }
         }
     }
+    
+    private void closeFileInputStreams() {
+        fileInputStreamCloser.close();
+        fileInputStreamCache = null;
+    } 
 
     private void cleanUpTempFile() {
         // cleanup temporary file

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index a298f40..97dc0b5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -26,8 +26,12 @@ import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
 import javax.crypto.CipherInputStream;
 
+import org.apache.camel.ParallelProcessableStream;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
@@ -35,21 +39,24 @@ import org.apache.camel.util.IOHelper;
 /**
  * A {@link StreamCache} for {@link File}s
  */
-public final class FileInputStreamCache extends InputStream implements 
StreamCache {
+public final class FileInputStreamCache extends InputStream implements 
StreamCache, ParallelProcessableStream {
     private InputStream stream;
     private final File file;
     private final CipherPair ciphers;
     private final long length;
+    private final FileInputStreamCache.FileInputStreamCloser closer;
 
     public FileInputStreamCache(File file) throws FileNotFoundException {
-        this(file, null);
+        this(file, null, new FileInputStreamCloser());
     }
     
-    FileInputStreamCache(File file, CipherPair ciphers) throws 
FileNotFoundException {
+    FileInputStreamCache(File file, CipherPair ciphers, FileInputStreamCloser 
closer) throws FileNotFoundException {
         this.file = file;
         this.stream = null;
         this.ciphers = ciphers;
         this.length = file.length();
+        this.closer = closer;
+        this.closer.add(this);
     }
     
     @Override
@@ -134,5 +141,49 @@ public final class FileInputStreamCache extends 
InputStream implements StreamCac
         }
         return in;
     }
+    
+    /** Creates a copy which uses the same underlying file
+     * and which has the same life cycle as the original instance (for example,
+     * will be closed automatically when the route is finished).
+     */
+    @Override
+    public ParallelProcessableStream copy() throws IOException {
+        FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, 
closer);
+        return copy;
+    }
+    
+    
+    /** 
+     * Collects all FileInputStreamCache instances of a temporary file which 
must be closed
+     * at the end of the route.
+     * 
+     * @see CachedOutputStream
+     * 
+     */
+    static class FileInputStreamCloser {
+        
+        // there can be several input streams, for example in the multi-cast 
parallel processing
+        private List<FileInputStreamCache> fileInputStreamCaches;
+        
+        /** Adds a FileInputStreamCache instance to the closer.
+         * <p>
+         * Must be synchronized, because can be accessed by several threads. 
+         */
+        synchronized void  add(FileInputStreamCache fileInputStreamCache) {
+            if (fileInputStreamCaches == null) {
+                fileInputStreamCaches = new ArrayList<FileInputStreamCache>(3);
+            }
+            fileInputStreamCaches.add(fileInputStreamCache);
+        }
+        
+        void close() {
+            if (fileInputStreamCaches != null) {
+                for (FileInputStreamCache fileInputStreamCache : 
fileInputStreamCaches) {
+                    fileInputStreamCache.close();
+                }
+                fileInputStreamCaches.clear();
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index ff0f9ce..e050498 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -20,12 +20,13 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.ParallelProcessableStream;
 import org.apache.camel.StreamCache;
 
 /**
  * A {@link StreamCache} for caching using an in-memory byte array.
  */
-public final class InputStreamCache extends ByteArrayInputStream implements 
StreamCache {
+public final class InputStreamCache extends ByteArrayInputStream implements 
StreamCache, ParallelProcessableStream {
 
     public InputStreamCache(byte[] data) {
         super(data);
@@ -35,6 +36,13 @@ public final class InputStreamCache extends 
ByteArrayInputStream implements Stre
         super(data);
         super.count = count;
     }
+    
+    private InputStreamCache(byte[] data, int pos, int count) {
+        super(data);  
+        super.pos = pos;
+        super.count = count;
+        // you cannot use super(data,off,len), because then mark is set to off!
+    }
 
     public void writeTo(OutputStream os) throws IOException {
         os.write(buf, pos, count - pos);
@@ -47,4 +55,10 @@ public final class InputStreamCache extends 
ByteArrayInputStream implements Stre
     public long length() {
         return count;
     }
+    
+    /** Creates a new InputStream using the same underlying cache. */
+    @Override
+    public ParallelProcessableStream copy() {
+        return new InputStreamCache(buf, super.pos, super.count);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index 9dbcad4..5c0cf23 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -20,12 +20,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.StringReader;
 
+import org.apache.camel.ParallelProcessableStream;
 import org.apache.camel.StreamCache;
 
 /**
  * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s
  */
-public class ReaderCache extends StringReader implements StreamCache {
+public class ReaderCache extends StringReader implements StreamCache, 
ParallelProcessableStream {
 
     private final String data;
 
@@ -63,4 +64,9 @@ public class ReaderCache extends StringReader implements 
StreamCache {
         return data;
     }
 
+    @Override
+    public ParallelProcessableStream copy() throws IOException {
+        return new ReaderCache(data);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java 
b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 8ac7c22..c25af28 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -25,6 +25,8 @@ import org.apache.camel.util.IOHelper;
 
 /**
  * {@link org.apache.camel.StreamCache} implementation for {@link 
org.apache.camel.StringSource}s
+ * Remark: It is not necessary to implement {@link 
org.apache.camel.ParallelProcessableStream}
+ * because this source can be used in several threads.
  */
 public final class SourceCache extends StringSource implements StreamCache {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index caa1adc..7d04835 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -16,23 +16,22 @@
  */
 package org.apache.camel.converter.stream;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.ParallelProcessableStream;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
 /**
  * A {@link org.apache.camel.StreamCache} for {@link 
javax.xml.transform.stream.StreamSource}s
  */
-public final class StreamSourceCache extends StreamSource implements 
StreamCache {
+public final class StreamSourceCache extends StreamSource implements 
StreamCache, ParallelProcessableStream {
 
-    private final InputStream stream;
     private final StreamCache streamCache;
     private final ReaderCache readCache;
 
@@ -44,19 +43,29 @@ public final class StreamSourceCache extends StreamSource 
implements StreamCache
             streamCache = cos.newStreamCache();
             readCache = null;
             setSystemId(source.getSystemId());
-            stream = (InputStream) streamCache;
+            setInputStream((InputStream) streamCache);
         } else if (source.getReader() != null) {
             String data = 
exchange.getContext().getTypeConverter().convertTo(String.class, exchange, 
source.getReader());
             readCache = new ReaderCache(data);
             streamCache = null;
             setReader(readCache);
-            stream = new ByteArrayInputStream(data.getBytes());
         } else {
             streamCache = null;
             readCache = null;
-            stream = null;
         }
     }
+    
+    private StreamSourceCache(StreamCache streamCache) {
+        this.streamCache = streamCache;
+        setInputStream((InputStream) streamCache);
+        this.readCache = null;
+    }
+    
+    private StreamSourceCache(ReaderCache readCache) {
+        this.streamCache = null;
+        this.readCache = readCache;
+        setReader(readCache);
+    }
 
     public void reset() {
         if (streamCache != null) {
@@ -65,13 +74,6 @@ public final class StreamSourceCache extends StreamSource 
implements StreamCache
         if (readCache != null) {
             readCache.reset();
         }
-        if (stream != null) {
-            try {
-                stream.reset();
-            } catch (IOException e) {
-                throw new RuntimeCamelException(e);
-            }
-        }
     }
 
     public void writeTo(OutputStream os) throws IOException {
@@ -104,14 +106,16 @@ public final class StreamSourceCache extends StreamSource 
implements StreamCache
         }
     }
 
+    
     @Override
-    public InputStream getInputStream() {
-        return stream;
-    }
-
-    @Override
-    public void setInputStream(InputStream inputStream) {
-        // noop as the input stream is from stream or reader cache
+    public ParallelProcessableStream copy() throws IOException {
+        if (streamCache != null) {
+            return  new 
StreamSourceCache((StreamCache)((ParallelProcessableStream)streamCache).copy());
+        }
+        if (readCache != null) {
+            return new StreamSourceCache((ReaderCache) readCache.copy());
+        }
+        return null;
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index ee41a20..f42d45a 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -43,6 +43,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
+import org.apache.camel.ParallelProcessableStream;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Traceable;
@@ -865,11 +866,28 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
     protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange) throws Exception {
         List<ProcessorExchangePair> result = new 
ArrayList<ProcessorExchangePair>(processors.size());
 
+        ParallelProcessableStream parallelProcessableStream = null;
+        if (isParallelProcessing() && exchange.getIn().getBody() instanceof 
ParallelProcessableStream) {
+            // in parallel processing case, the stream must be copied, 
therefore get the stream
+            parallelProcessableStream = (ParallelProcessableStream) 
exchange.getIn().getBody();
+        }
+
         int index = 0;
         for (Processor processor : processors) {
             // copy exchange, and do not share the unit of work
             Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false);
 
+            if (parallelProcessableStream != null) {
+                if (index > 0) {
+                    // copy it otherwise parallel processing is not possible,
+                    // because streams can only be read once
+                    ParallelProcessableStream copiedStreamCache = 
parallelProcessableStream.copy();
+                    if (copiedStreamCache != null) {
+                        copy.getIn().setBody(copiedStreamCache);  
+                    }
+                }
+            }
+
             // If the multi-cast processor has an aggregation strategy
             // then the StreamCache created by the child routes must not be 
             // closed by the unit of work of the child route, but by the unit 
of 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java
new file mode 100644
index 0000000..247a529
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import javax.xml.transform.sax.SAXSource;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.StringSource;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * Tests the processing of a stream-cache in the multi-cast processor in the
+ * parallel processing mode.
+ */
+public class MultiCastParallelAndStreamCachingTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                context.getStreamCachingStrategy().setEnabled(true);
+                
context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
+                context.getStreamCachingStrategy().setSpoolThreshold(5L);
+
+                
from("direct:start").multicast().parallelProcessing().stopOnException().to("direct:a",
 "direct:b").end().to("mock:result");
+
+                from("direct:a") //
+                        // read stream
+                        .process(new 
SimpleProcessor(false)).to("mock:resulta");
+
+                from("direct:b") //
+                        // read stream concurrently, because of parallel 
processing
+                        .process(new SimpleProcessor(true)).to("mock:resultb");
+
+            }
+        };
+    }
+
+    private static class SimpleProcessor implements Processor {
+
+        private final boolean withSleepTime;
+
+        SimpleProcessor(boolean withSleepTime) {
+            this.withSleepTime = withSleepTime;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+
+            if (withSleepTime) {
+                // simulate some processing in order to get easier concurrency 
effects
+                Thread.sleep(900);
+            }
+            Object body = exchange.getIn().getBody();
+            if (body instanceof InputStream) {
+                ByteArrayOutputStream output = new ByteArrayOutputStream();
+                IOHelper.copy((InputStream) body, output);
+                exchange.getOut().setBody(output.toByteArray());
+            } else if (body instanceof Reader) {
+                Reader reader = (Reader) body;
+                StringBuilder sb = new StringBuilder();
+                for (int i = reader.read(); i > -1; i = reader.read()) {
+                    sb.append((char) i);
+                }
+                reader.close();
+                exchange.getOut().setBody(sb.toString());
+            } else if (body instanceof StreamSource) {
+                StreamSource ss = (StreamSource) body;
+                if (ss.getInputStream() != null) {
+                    ByteArrayOutputStream output = new ByteArrayOutputStream();
+                    IOHelper.copy(ss.getInputStream(), output);
+                    exchange.getOut().setBody(output.toByteArray());
+                } else if (ss.getReader() != null) {
+                    Reader reader = (Reader) ss.getReader();
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = reader.read(); i > -1; i = reader.read()) {
+                        sb.append((char) i);
+                    }
+                    reader.close();
+                    exchange.getOut().setBody(sb.toString());
+                } else {
+                    throw new RuntimeException("StreamSource without 
InputStream and without Reader not supported");
+                }
+            } else {
+                throw new RuntimeException("Type " + body.getClass().getName() 
+ " not supported");
+            }
+
+        }
+    }
+
+    /**
+     * Tests the ByteArrayInputStreamCache. The send byte array is transformed
+     * to a ByteArrayInputStreamCache before the multi-cast processor is 
called.
+     * 
+     * @throws Exception
+     */
+    public void testByteArrayInputStreamCache() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived("<start></start>");
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived("<start></start>");
+
+        template.sendBody("direct:start", new 
ByteArrayInputStream("<start></start>".getBytes("UTF-8")));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * Tests the FileInputStreamCache.
+     * 
+     * The sent input stream is transformed to FileInputStreamCache before the
+     * multi-cast processor is called.
+     * 
+     * @throws Exception
+     */
+    public void testFileInputStreamCache() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived("James,Guillaume,Hiram,Rob,Roman");
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived("James,Guillaume,Hiram,Rob,Roman");
+
+        InputStream in = 
MultiCastParallelAndStreamCachingTest.class.getClassLoader().getResourceAsStream(
+                "org/apache/camel/processor/simple.txt");
+        template.sendBody("direct:start", in);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * Tests the FileInputStreamCache.
+     * 
+     * The sent input stream is transformed to InputStreamCache before the
+     * multi-cast processor is called.
+     * 
+     * @throws Exception
+     */
+    public void testInputStreamCache() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived("A");
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived("A");
+
+        InputStream in = 
MultiCastParallelAndStreamCachingTest.class.getClassLoader().getResourceAsStream(
+                "org/apache/camel/processor/oneCharacter.txt");
+        // The body is only one character. Therefore InputStreamCache is used 
for stream caching
+        template.sendBody("direct:start", in);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * Tests the ReaderCache.
+     * 
+     * The sent InputStreamReader is transformed to a ReaderCache before the
+     * multi-cast processor is called.
+     * 
+     * @throws Exception
+     */
+    public void testReaderCache() throws Exception {
+
+        String abcScharpS = "ABCß"; // sharp-s
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived(abcScharpS);
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived(abcScharpS);
+
+        InputStreamReader isr = new InputStreamReader(new 
ByteArrayInputStream(abcScharpS.getBytes("ISO-8859-1")), "ISO-8859-1");
+        template.sendBody("direct:start", isr);
+
+        assertMockEndpointsSatisfied();
+    }
+
+   
+    public void testStreamSourceCacheWithInputStream() throws Exception {
+        String input = "<A>a</A>";
+
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived(input);
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived(input);
+
+        StreamSource ss = new StreamSource(new 
ByteArrayInputStream(input.getBytes("UTF-8")));
+        template.sendBody("direct:start", ss);
+
+        assertMockEndpointsSatisfied();
+
+    }
+    
+    public void testStreamSourceCacheWithReader() throws Exception {
+        String input = "ABCß"; // sharp-s
+
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived(input);
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived(input);
+
+        InputStreamReader isr = new InputStreamReader(new 
ByteArrayInputStream(input.getBytes("ISO-8859-1")), "ISO-8859-1");
+        StreamSource ss = new StreamSource(isr);
+        template.sendBody("direct:start", ss);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSourceCache() throws Exception {
+        String input = "<A>a</A>";
+
+        MockEndpoint mock = getMockEndpoint("mock:resulta");
+        mock.expectedBodiesReceived(input);
+        mock = getMockEndpoint("mock:resultb");
+        mock.expectedBodiesReceived(input);
+
+        StringSource ss = new StringSource(input);
+        SAXSource saxSource = new SAXSource(SAXSource.sourceToInputSource(ss));
+        template.sendBody("direct:start", saxSource);
+
+        assertMockEndpointsSatisfied();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt 
b/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt
new file mode 100644
index 0000000..8c7e5a6
--- /dev/null
+++ b/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt
@@ -0,0 +1 @@
+A
\ No newline at end of file

Reply via email to