Repository: camel Updated Branches: refs/heads/master a09587b3b -> 4d827f208
CAMEL-8284: MultiCast in Parallel Processing Mode with StreamCache leads to wrong results. Thanks to Franz Forsthofer for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d827f20 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d827f20 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d827f20 Branch: refs/heads/master Commit: 4d827f2087fd44e37fea3ca98c701def8bfa3e6e Parents: a09587b Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Feb 18 15:54:46 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 18 15:54:46 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/ParallelProcessableStream.java | 46 ++++ .../stream/ByteArrayInputStreamCache.java | 24 +- .../converter/stream/CachedOutputStream.java | 42 ++-- .../converter/stream/FileInputStreamCache.java | 57 ++++- .../converter/stream/InputStreamCache.java | 16 +- .../camel/converter/stream/ReaderCache.java | 8 +- .../camel/converter/stream/SourceCache.java | 2 + .../converter/stream/StreamSourceCache.java | 46 ++-- .../camel/processor/MulticastProcessor.java | 18 ++ .../MultiCastParallelAndStreamCachingTest.java | 246 +++++++++++++++++++ .../org/apache/camel/processor/oneCharacter.txt | 1 + 11 files changed, 451 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java b/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java new file mode 100644 index 0000000..7f03967 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.io.IOException; + +/** + * Tagging interface to indicate that a stream can be used in parallel + * processing by offering a copy method. + * <p/> + * This is a useful feature for avoiding message re-readability issues which can + * occur when the same message is processed by several threads. This interface + * is mainly used by the {@link org.apache.camel.processor.MulticastProcessor} + * and should be implemented by all implementers of + * {@link org.apache.camel.StreamCache} + * + * @version + */ +public interface ParallelProcessableStream { + + /** + * Create a copy of the stream. If possible use the same cached data in the + * copied instance. + * <p> + * This method is useful for parallel processing. + * + * @throws java.io.IOException + * if the copy fails + */ + ParallelProcessableStream copy() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java index ba91472..d72968f 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java @@ -17,19 +17,23 @@ package org.apache.camel.converter.stream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.OutputStream; +import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; /** * A {@link StreamCache} for {@link java.io.ByteArrayInputStream} */ -public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache { +public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache, ParallelProcessableStream { private final int length; + + private byte[] byteArrayForCopy; public ByteArrayInputStreamCache(ByteArrayInputStream in) { super(in); @@ -57,4 +61,22 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre public long length() { return length; } + + /** + * Transforms to InputStreamCache by copying the byte array. An + * InputStreamCache can be copied in such a way that the underlying byte + * array is kept. + */ + @Override + public ParallelProcessableStream copy() throws IOException { + if (byteArrayForCopy == null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(in.available()); + IOHelper.copy(this, baos); + // reset so that the stream can be reused + reset(); + // cache the byte array, in order not to copy the byte array in the next call again + byteArrayForCopy = baos.toByteArray(); + } + return new InputStreamCache(byteArrayForCopy); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java index 616b2ed..639e339 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java @@ -30,6 +30,7 @@ import javax.crypto.CipherOutputStream; import org.apache.camel.Exchange; import org.apache.camel.StreamCache; +import org.apache.camel.converter.stream.FileInputStreamCache.FileInputStreamCloser; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; @@ -48,8 +49,8 @@ import org.slf4j.LoggerFactory; * system property of "java.io.tmpdir". * <p/> * You can get a cached input stream of this stream. The temp file which is created with this - * output stream will be deleted when you close this output stream or the all cached - * fileInputStream is closed after the exchange is completed. + * output stream will be deleted when you close this output stream or the cached + * fileInputStream(s) is/are closed after the exchange is completed. */ public class CachedOutputStream extends OutputStream { @Deprecated @@ -68,6 +69,7 @@ public class CachedOutputStream extends OutputStream { private int totalLength; private File tempFile; private FileInputStreamCache fileInputStreamCache; + private final FileInputStreamCloser fileInputStreamCloser = new FileInputStreamCloser(); private CipherPair ciphers; private final boolean closedOnCompletion; @@ -85,9 +87,7 @@ public class CachedOutputStream extends OutputStream { @Override public void onDone(Exchange exchange) { try { - if (fileInputStreamCache != null) { - fileInputStreamCache.close(); - } + closeFileInputStreams(); close(); try { cleanUpTempFile(); @@ -127,9 +127,7 @@ public class CachedOutputStream extends OutputStream { currentStream.close(); // need to clean up the temp file this time if (!closedOnCompletion) { - if (fileInputStreamCache != null) { - fileInputStreamCache.close(); - } + closeFileInputStreams(); try { cleanUpTempFile(); } catch (Exception e) { @@ -179,29 +177,12 @@ public class CachedOutputStream extends OutputStream { } public InputStream getInputStream() throws IOException { - flush(); - - if (inMemory) { - if (currentStream instanceof CachedByteArrayOutputStream) { - return ((CachedByteArrayOutputStream) currentStream).newInputStreamCache(); - } else { - throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName()); - } - } else { - try { - if (fileInputStreamCache == null) { - fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers); - } - return fileInputStreamCache; - } catch (FileNotFoundException e) { - throw new IOException("Cached file " + tempFile + " not found", e); - } - } + return (InputStream)newStreamCache(); } public InputStream getWrappedInputStream() throws IOException { // The WrappedInputStream will close the CachedOutputStream when it is closed - return new WrappedInputStream(this, getInputStream()); + return new WrappedInputStream(this, (InputStream)newStreamCache()); } /** @@ -227,7 +208,7 @@ public class CachedOutputStream extends OutputStream { } else { try { if (fileInputStreamCache == null) { - fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers); + fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers, fileInputStreamCloser); } return fileInputStreamCache; } catch (FileNotFoundException e) { @@ -235,6 +216,11 @@ public class CachedOutputStream extends OutputStream { } } } + + private void closeFileInputStreams() { + fileInputStreamCloser.close(); + fileInputStreamCache = null; + } private void cleanUpTempFile() { // cleanup temporary file http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index a298f40..97dc0b5 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -26,8 +26,12 @@ import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; + import javax.crypto.CipherInputStream; +import org.apache.camel.ParallelProcessableStream; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; @@ -35,21 +39,24 @@ import org.apache.camel.util.IOHelper; /** * A {@link StreamCache} for {@link File}s */ -public final class FileInputStreamCache extends InputStream implements StreamCache { +public final class FileInputStreamCache extends InputStream implements StreamCache, ParallelProcessableStream { private InputStream stream; private final File file; private final CipherPair ciphers; private final long length; + private final FileInputStreamCache.FileInputStreamCloser closer; public FileInputStreamCache(File file) throws FileNotFoundException { - this(file, null); + this(file, null, new FileInputStreamCloser()); } - FileInputStreamCache(File file, CipherPair ciphers) throws FileNotFoundException { + FileInputStreamCache(File file, CipherPair ciphers, FileInputStreamCloser closer) throws FileNotFoundException { this.file = file; this.stream = null; this.ciphers = ciphers; this.length = file.length(); + this.closer = closer; + this.closer.add(this); } @Override @@ -134,5 +141,49 @@ public final class FileInputStreamCache extends InputStream implements StreamCac } return in; } + + /** Creates a copy which uses the same underlying file + * and which has the same life cycle as the original instance (for example, + * will be closed automatically when the route is finished). + */ + @Override + public ParallelProcessableStream copy() throws IOException { + FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, closer); + return copy; + } + + + /** + * Collects all FileInputStreamCache instances of a temporary file which must be closed + * at the end of the route. + * + * @see CachedOutputStream + * + */ + static class FileInputStreamCloser { + + // there can be several input streams, for example in the multi-cast parallel processing + private List<FileInputStreamCache> fileInputStreamCaches; + + /** Adds a FileInputStreamCache instance to the closer. + * <p> + * Must be synchronized, because can be accessed by several threads. + */ + synchronized void add(FileInputStreamCache fileInputStreamCache) { + if (fileInputStreamCaches == null) { + fileInputStreamCaches = new ArrayList<FileInputStreamCache>(3); + } + fileInputStreamCaches.add(fileInputStreamCache); + } + + void close() { + if (fileInputStreamCaches != null) { + for (FileInputStreamCache fileInputStreamCache : fileInputStreamCaches) { + fileInputStreamCache.close(); + } + fileInputStreamCaches.clear(); + } + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java index ff0f9ce..e050498 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java @@ -20,12 +20,13 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; +import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; /** * A {@link StreamCache} for caching using an in-memory byte array. */ -public final class InputStreamCache extends ByteArrayInputStream implements StreamCache { +public final class InputStreamCache extends ByteArrayInputStream implements StreamCache, ParallelProcessableStream { public InputStreamCache(byte[] data) { super(data); @@ -35,6 +36,13 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre super(data); super.count = count; } + + private InputStreamCache(byte[] data, int pos, int count) { + super(data); + super.pos = pos; + super.count = count; + // you cannot use super(data,off,len), because then mark is set to off! + } public void writeTo(OutputStream os) throws IOException { os.write(buf, pos, count - pos); @@ -47,4 +55,10 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre public long length() { return count; } + + /** Creates a new InputStream using the same underlying cache. */ + @Override + public ParallelProcessableStream copy() { + return new InputStreamCache(buf, super.pos, super.count); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java index 9dbcad4..5c0cf23 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java @@ -20,12 +20,13 @@ import java.io.IOException; import java.io.OutputStream; import java.io.StringReader; +import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; /** * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s */ -public class ReaderCache extends StringReader implements StreamCache { +public class ReaderCache extends StringReader implements StreamCache, ParallelProcessableStream { private final String data; @@ -63,4 +64,9 @@ public class ReaderCache extends StringReader implements StreamCache { return data; } + @Override + public ParallelProcessableStream copy() throws IOException { + return new ReaderCache(data); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java index 8ac7c22..c25af28 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java @@ -25,6 +25,8 @@ import org.apache.camel.util.IOHelper; /** * {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.StringSource}s + * Remark: It is not necessary to implement {@link org.apache.camel.ParallelProcessableStream} + * because this source can be used in several threads. */ public final class SourceCache extends StringSource implements StreamCache { http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java index caa1adc..7d04835 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java @@ -16,23 +16,22 @@ */ package org.apache.camel.converter.stream; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; + import javax.xml.transform.stream.StreamSource; import org.apache.camel.Exchange; -import org.apache.camel.RuntimeCamelException; +import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; /** * A {@link org.apache.camel.StreamCache} for {@link javax.xml.transform.stream.StreamSource}s */ -public final class StreamSourceCache extends StreamSource implements StreamCache { +public final class StreamSourceCache extends StreamSource implements StreamCache, ParallelProcessableStream { - private final InputStream stream; private final StreamCache streamCache; private final ReaderCache readCache; @@ -44,19 +43,29 @@ public final class StreamSourceCache extends StreamSource implements StreamCache streamCache = cos.newStreamCache(); readCache = null; setSystemId(source.getSystemId()); - stream = (InputStream) streamCache; + setInputStream((InputStream) streamCache); } else if (source.getReader() != null) { String data = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, source.getReader()); readCache = new ReaderCache(data); streamCache = null; setReader(readCache); - stream = new ByteArrayInputStream(data.getBytes()); } else { streamCache = null; readCache = null; - stream = null; } } + + private StreamSourceCache(StreamCache streamCache) { + this.streamCache = streamCache; + setInputStream((InputStream) streamCache); + this.readCache = null; + } + + private StreamSourceCache(ReaderCache readCache) { + this.streamCache = null; + this.readCache = readCache; + setReader(readCache); + } public void reset() { if (streamCache != null) { @@ -65,13 +74,6 @@ public final class StreamSourceCache extends StreamSource implements StreamCache if (readCache != null) { readCache.reset(); } - if (stream != null) { - try { - stream.reset(); - } catch (IOException e) { - throw new RuntimeCamelException(e); - } - } } public void writeTo(OutputStream os) throws IOException { @@ -104,14 +106,16 @@ public final class StreamSourceCache extends StreamSource implements StreamCache } } + @Override - public InputStream getInputStream() { - return stream; - } - - @Override - public void setInputStream(InputStream inputStream) { - // noop as the input stream is from stream or reader cache + public ParallelProcessableStream copy() throws IOException { + if (streamCache != null) { + return new StreamSourceCache((StreamCache)((ParallelProcessableStream)streamCache).copy()); + } + if (readCache != null) { + return new StreamSourceCache((ReaderCache) readCache.copy()); + } + return null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index ee41a20..f42d45a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -43,6 +43,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.Navigate; +import org.apache.camel.ParallelProcessableStream; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Traceable; @@ -865,11 +866,28 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); + ParallelProcessableStream parallelProcessableStream = null; + if (isParallelProcessing() && exchange.getIn().getBody() instanceof ParallelProcessableStream) { + // in parallel processing case, the stream must be copied, therefore get the stream + parallelProcessableStream = (ParallelProcessableStream) exchange.getIn().getBody(); + } + int index = 0; for (Processor processor : processors) { // copy exchange, and do not share the unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + if (parallelProcessableStream != null) { + if (index > 0) { + // copy it otherwise parallel processing is not possible, + // because streams can only be read once + ParallelProcessableStream copiedStreamCache = parallelProcessableStream.copy(); + if (copiedStreamCache != null) { + copy.getIn().setBody(copiedStreamCache); + } + } + } + // If the multi-cast processor has an aggregation strategy // then the StreamCache created by the child routes must not be // closed by the unit of work of the child route, but by the unit of http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java new file mode 100644 index 0000000..247a529 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +import javax.xml.transform.sax.SAXSource; +import javax.xml.transform.stream.StreamSource; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.StringSource; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.IOHelper; + +/** + * Tests the processing of a stream-cache in the multi-cast processor in the + * parallel processing mode. + */ +public class MultiCastParallelAndStreamCachingTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setStreamCaching(true); + context.getStreamCachingStrategy().setEnabled(true); + context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache"); + context.getStreamCachingStrategy().setSpoolThreshold(5L); + + from("direct:start").multicast().parallelProcessing().stopOnException().to("direct:a", "direct:b").end().to("mock:result"); + + from("direct:a") // + // read stream + .process(new SimpleProcessor(false)).to("mock:resulta"); + + from("direct:b") // + // read stream concurrently, because of parallel processing + .process(new SimpleProcessor(true)).to("mock:resultb"); + + } + }; + } + + private static class SimpleProcessor implements Processor { + + private final boolean withSleepTime; + + SimpleProcessor(boolean withSleepTime) { + this.withSleepTime = withSleepTime; + } + + @Override + public void process(Exchange exchange) throws Exception { + + if (withSleepTime) { + // simulate some processing in order to get easier concurrency effects + Thread.sleep(900); + } + Object body = exchange.getIn().getBody(); + if (body instanceof InputStream) { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + IOHelper.copy((InputStream) body, output); + exchange.getOut().setBody(output.toByteArray()); + } else if (body instanceof Reader) { + Reader reader = (Reader) body; + StringBuilder sb = new StringBuilder(); + for (int i = reader.read(); i > -1; i = reader.read()) { + sb.append((char) i); + } + reader.close(); + exchange.getOut().setBody(sb.toString()); + } else if (body instanceof StreamSource) { + StreamSource ss = (StreamSource) body; + if (ss.getInputStream() != null) { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + IOHelper.copy(ss.getInputStream(), output); + exchange.getOut().setBody(output.toByteArray()); + } else if (ss.getReader() != null) { + Reader reader = (Reader) ss.getReader(); + StringBuilder sb = new StringBuilder(); + for (int i = reader.read(); i > -1; i = reader.read()) { + sb.append((char) i); + } + reader.close(); + exchange.getOut().setBody(sb.toString()); + } else { + throw new RuntimeException("StreamSource without InputStream and without Reader not supported"); + } + } else { + throw new RuntimeException("Type " + body.getClass().getName() + " not supported"); + } + + } + } + + /** + * Tests the ByteArrayInputStreamCache. The send byte array is transformed + * to a ByteArrayInputStreamCache before the multi-cast processor is called. + * + * @throws Exception + */ + public void testByteArrayInputStreamCache() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived("<start></start>"); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived("<start></start>"); + + template.sendBody("direct:start", new ByteArrayInputStream("<start></start>".getBytes("UTF-8"))); + + assertMockEndpointsSatisfied(); + } + + /** + * Tests the FileInputStreamCache. + * + * The sent input stream is transformed to FileInputStreamCache before the + * multi-cast processor is called. + * + * @throws Exception + */ + public void testFileInputStreamCache() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived("James,Guillaume,Hiram,Rob,Roman"); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived("James,Guillaume,Hiram,Rob,Roman"); + + InputStream in = MultiCastParallelAndStreamCachingTest.class.getClassLoader().getResourceAsStream( + "org/apache/camel/processor/simple.txt"); + template.sendBody("direct:start", in); + + assertMockEndpointsSatisfied(); + } + + /** + * Tests the FileInputStreamCache. + * + * The sent input stream is transformed to InputStreamCache before the + * multi-cast processor is called. + * + * @throws Exception + */ + public void testInputStreamCache() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived("A"); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived("A"); + + InputStream in = MultiCastParallelAndStreamCachingTest.class.getClassLoader().getResourceAsStream( + "org/apache/camel/processor/oneCharacter.txt"); + // The body is only one character. Therefore InputStreamCache is used for stream caching + template.sendBody("direct:start", in); + + assertMockEndpointsSatisfied(); + } + + /** + * Tests the ReaderCache. + * + * The sent InputStreamReader is transformed to a ReaderCache before the + * multi-cast processor is called. + * + * @throws Exception + */ + public void testReaderCache() throws Exception { + + String abcScharpS = "ABCÃ"; // sharp-s + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived(abcScharpS); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived(abcScharpS); + + InputStreamReader isr = new InputStreamReader(new ByteArrayInputStream(abcScharpS.getBytes("ISO-8859-1")), "ISO-8859-1"); + template.sendBody("direct:start", isr); + + assertMockEndpointsSatisfied(); + } + + + public void testStreamSourceCacheWithInputStream() throws Exception { + String input = "<A>a</A>"; + + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived(input); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived(input); + + StreamSource ss = new StreamSource(new ByteArrayInputStream(input.getBytes("UTF-8"))); + template.sendBody("direct:start", ss); + + assertMockEndpointsSatisfied(); + + } + + public void testStreamSourceCacheWithReader() throws Exception { + String input = "ABCÃ"; // sharp-s + + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived(input); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived(input); + + InputStreamReader isr = new InputStreamReader(new ByteArrayInputStream(input.getBytes("ISO-8859-1")), "ISO-8859-1"); + StreamSource ss = new StreamSource(isr); + template.sendBody("direct:start", ss); + + assertMockEndpointsSatisfied(); + } + + public void testSourceCache() throws Exception { + String input = "<A>a</A>"; + + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived(input); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived(input); + + StringSource ss = new StringSource(input); + SAXSource saxSource = new SAXSource(SAXSource.sourceToInputSource(ss)); + template.sendBody("direct:start", saxSource); + + assertMockEndpointsSatisfied(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4d827f20/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt ---------------------------------------------------------------------- diff --git a/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt b/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt new file mode 100644 index 0000000..8c7e5a6 --- /dev/null +++ b/camel-core/src/test/resources/org/apache/camel/processor/oneCharacter.txt @@ -0,0 +1 @@ +A \ No newline at end of file