Repository: camel Updated Branches: refs/heads/master bf794e830 -> cdadacb80
CAMEL-8284: Moved copy to StreamCache Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72ed78df Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72ed78df Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72ed78df Branch: refs/heads/master Commit: 72ed78df6c1b5544723b0577cd535408404a65ed Parents: bf794e8 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Feb 21 10:33:24 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Feb 21 10:33:24 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/ParallelProcessableStream.java | 46 -------------------- .../main/java/org/apache/camel/StreamCache.java | 17 ++++++++ .../stream/ByteArrayInputStreamCache.java | 32 +++++--------- .../converter/stream/FileInputStreamCache.java | 22 +++------- .../converter/stream/InputStreamCache.java | 20 +++------ .../camel/converter/stream/ReaderCache.java | 12 +++-- .../camel/converter/stream/SourceCache.java | 7 +-- .../converter/stream/StreamSourceCache.java | 27 +++++------- .../camel/processor/MulticastProcessor.java | 12 ++--- .../apache/camel/util/MessageHelperTest.java | 4 ++ .../http/NettyChannelBufferStreamCache.java | 5 +++ .../http/NettyChannelBufferStreamCache.java | 8 +++- 12 files changed, 83 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java b/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java deleted file mode 100644 index 7f03967..0000000 --- a/camel-core/src/main/java/org/apache/camel/ParallelProcessableStream.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel; - -import java.io.IOException; - -/** - * Tagging interface to indicate that a stream can be used in parallel - * processing by offering a copy method. - * <p/> - * This is a useful feature for avoiding message re-readability issues which can - * occur when the same message is processed by several threads. This interface - * is mainly used by the {@link org.apache.camel.processor.MulticastProcessor} - * and should be implemented by all implementers of - * {@link org.apache.camel.StreamCache} - * - * @version - */ -public interface ParallelProcessableStream { - - /** - * Create a copy of the stream. If possible use the same cached data in the - * copied instance. - * <p> - * This method is useful for parallel processing. - * - * @throws java.io.IOException - * if the copy fails - */ - ParallelProcessableStream copy() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/StreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java b/camel-core/src/main/java/org/apache/camel/StreamCache.java index 13ab6c1..ecd9736 100644 --- a/camel-core/src/main/java/org/apache/camel/StreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java @@ -28,6 +28,9 @@ import java.io.OutputStream; * <p/> * The Camel routing engine uses the {@link org.apache.camel.processor.CamelInternalProcessor.StreamCachingAdvice} * to apply the stream cache during routing. + * <p/> + * It is recommended in the {@link #copy()} method to let the copied stream start from the start. If the implementation + * does not support copy, then return <tt>null</tt>. * * @version */ @@ -49,6 +52,20 @@ public interface StreamCache { void writeTo(OutputStream os) throws IOException; /** + * Create a copy of the stream. If possible use the same cached data in the + * copied instance. + * <p/> + * This method is useful for parallel processing. + * <p/> + * Implementations note: A copy of the stream is recommended to read from the start + * of the stream. + * + * @return a copy, or <tt>null</tt> if copy is not possible + * @throws java.io.IOException is thrown if the copy fails + */ + StreamCache copy() throws IOException; + + /** * Whether this {@link StreamCache} is in memory only or * spooled to persistent storage such as files. */ http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java index d72968f..89d1c59 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java @@ -22,17 +22,15 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.OutputStream; -import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; /** * A {@link StreamCache} for {@link java.io.ByteArrayInputStream} */ -public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache, ParallelProcessableStream { +public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache { private final int length; - private byte[] byteArrayForCopy; public ByteArrayInputStreamCache(ByteArrayInputStream in) { @@ -40,6 +38,7 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre this.length = in.available(); } + @Override public void reset() { try { super.reset(); @@ -48,27 +47,11 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre } } - public void writeTo(OutputStream os) throws IOException { IOHelper.copyAndCloseInput(in, os); } - public boolean inMemory() { - return true; - } - - @Override - public long length() { - return length; - } - - /** - * Transforms to InputStreamCache by copying the byte array. An - * InputStreamCache can be copied in such a way that the underlying byte - * array is kept. - */ - @Override - public ParallelProcessableStream copy() throws IOException { + public StreamCache copy() throws IOException { if (byteArrayForCopy == null) { ByteArrayOutputStream baos = new ByteArrayOutputStream(in.available()); IOHelper.copy(this, baos); @@ -79,4 +62,13 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre } return new InputStreamCache(byteArrayForCopy); } + + public boolean inMemory() { + return true; + } + + @Override + public long length() { + return length; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index 97dc0b5..5e40248 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -28,10 +28,8 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.List; - import javax.crypto.CipherInputStream; -import org.apache.camel.ParallelProcessableStream; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; @@ -39,7 +37,7 @@ import org.apache.camel.util.IOHelper; /** * A {@link StreamCache} for {@link File}s */ -public final class FileInputStreamCache extends InputStream implements StreamCache, ParallelProcessableStream { +public final class FileInputStreamCache extends InputStream implements StreamCache { private InputStream stream; private final File file; private final CipherPair ciphers; @@ -101,6 +99,11 @@ public final class FileInputStreamCache extends InputStream implements StreamCac } } + public StreamCache copy() throws IOException { + FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, closer); + return copy; + } + public boolean inMemory() { return false; } @@ -141,18 +144,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac } return in; } - - /** Creates a copy which uses the same underlying file - * and which has the same life cycle as the original instance (for example, - * will be closed automatically when the route is finished). - */ - @Override - public ParallelProcessableStream copy() throws IOException { - FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, closer); - return copy; - } - - + /** * Collects all FileInputStreamCache instances of a temporary file which must be closed * at the end of the route. http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java index e050498..92a1a4e 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java @@ -20,13 +20,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; -import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; /** * A {@link StreamCache} for caching using an in-memory byte array. */ -public final class InputStreamCache extends ByteArrayInputStream implements StreamCache, ParallelProcessableStream { +public final class InputStreamCache extends ByteArrayInputStream implements StreamCache { public InputStreamCache(byte[] data) { super(data); @@ -37,17 +36,14 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre super.count = count; } - private InputStreamCache(byte[] data, int pos, int count) { - super(data); - super.pos = pos; - super.count = count; - // you cannot use super(data,off,len), because then mark is set to off! - } - public void writeTo(OutputStream os) throws IOException { os.write(buf, pos, count - pos); } + public StreamCache copy() { + return new InputStreamCache(buf); + } + public boolean inMemory() { return true; } @@ -55,10 +51,4 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre public long length() { return count; } - - /** Creates a new InputStream using the same underlying cache. */ - @Override - public ParallelProcessableStream copy() { - return new InputStreamCache(buf, super.pos, super.count); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java index 5c0cf23..bed761c 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java @@ -20,13 +20,12 @@ import java.io.IOException; import java.io.OutputStream; import java.io.StringReader; -import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; /** * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s */ -public class ReaderCache extends StringReader implements StreamCache, ParallelProcessableStream { +public class ReaderCache extends StringReader implements StreamCache { private final String data; @@ -52,6 +51,10 @@ public class ReaderCache extends StringReader implements StreamCache, ParallelPr os.write(data.getBytes()); } + public StreamCache copy() throws IOException { + return new ReaderCache(data); + } + public boolean inMemory() { return true; } @@ -64,9 +67,4 @@ public class ReaderCache extends StringReader implements StreamCache, ParallelPr return data; } - @Override - public ParallelProcessableStream copy() throws IOException { - return new ReaderCache(data); - } - } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java index c25af28..16f8422 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java @@ -25,8 +25,6 @@ import org.apache.camel.util.IOHelper; /** * {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.StringSource}s - * Remark: It is not necessary to implement {@link org.apache.camel.ParallelProcessableStream} - * because this source can be used in several threads. */ public final class SourceCache extends StringSource implements StreamCache { @@ -46,11 +44,14 @@ public final class SourceCache extends StringSource implements StreamCache { IOHelper.copy(getInputStream(), os); } + public StreamCache copy() throws IOException { + return new SourceCache(getText()); + } + public boolean inMemory() { return true; } - @Override public long length() { return length; } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java index 7d04835..3367866 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java @@ -19,18 +19,16 @@ package org.apache.camel.converter.stream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - import javax.xml.transform.stream.StreamSource; import org.apache.camel.Exchange; -import org.apache.camel.ParallelProcessableStream; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; /** * A {@link org.apache.camel.StreamCache} for {@link javax.xml.transform.stream.StreamSource}s */ -public final class StreamSourceCache extends StreamSource implements StreamCache, ParallelProcessableStream { +public final class StreamSourceCache extends StreamSource implements StreamCache { private final StreamCache streamCache; private final ReaderCache readCache; @@ -84,6 +82,16 @@ public final class StreamSourceCache extends StreamSource implements StreamCache } } + public StreamCache copy() throws IOException { + if (streamCache != null) { + return new StreamSourceCache(streamCache).copy(); + } + if (readCache != null) { + return new StreamSourceCache((ReaderCache) readCache.copy()); + } + return null; + } + public boolean inMemory() { if (streamCache != null) { return streamCache.inMemory(); @@ -95,6 +103,7 @@ public final class StreamSourceCache extends StreamSource implements StreamCache } } + public long length() { if (streamCache != null) { return streamCache.length(); @@ -106,16 +115,4 @@ public final class StreamSourceCache extends StreamSource implements StreamCache } } - - @Override - public ParallelProcessableStream copy() throws IOException { - if (streamCache != null) { - return new StreamSourceCache((StreamCache)((ParallelProcessableStream)streamCache).copy()); - } - if (readCache != null) { - return new StreamSourceCache((ReaderCache) readCache.copy()); - } - return null; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index f42d45a..b953d08 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -43,9 +43,9 @@ import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.Navigate; -import org.apache.camel.ParallelProcessableStream; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; @@ -866,10 +866,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); - ParallelProcessableStream parallelProcessableStream = null; - if (isParallelProcessing() && exchange.getIn().getBody() instanceof ParallelProcessableStream) { + StreamCache streamCache = null; + if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) { // in parallel processing case, the stream must be copied, therefore get the stream - parallelProcessableStream = (ParallelProcessableStream) exchange.getIn().getBody(); + streamCache = (StreamCache) exchange.getIn().getBody(); } int index = 0; @@ -877,11 +877,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // copy exchange, and do not share the unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); - if (parallelProcessableStream != null) { + if (streamCache != null) { if (index > 0) { // copy it otherwise parallel processing is not possible, // because streams can only be read once - ParallelProcessableStream copiedStreamCache = parallelProcessableStream.copy(); + StreamCache copiedStreamCache = streamCache.copy(); if (copiedStreamCache != null) { copy.getIn().setBody(copiedStreamCache); } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java index 4826c41..7448989 100644 --- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java +++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java @@ -63,6 +63,10 @@ public class MessageHelperTest extends TestCase { // noop } + public StreamCache copy() throws IOException { + return null; + } + public boolean inMemory() { return true; } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java index 344e88b..8ecb8f7 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java @@ -87,6 +87,11 @@ public final class NettyChannelBufferStreamCache extends InputStream implements } @Override + public StreamCache copy() throws IOException { + return new NettyChannelBufferStreamCache(buffer.copy()); + } + + @Override public boolean inMemory() { return true; } http://git-wip-us.apache.org/repos/asf/camel/blob/72ed78df/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java index 74ec6e8..d8a183f 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java @@ -24,9 +24,8 @@ import io.netty.buffer.ByteBuf; import org.apache.camel.StreamCache; import org.apache.camel.util.IOHelper; - /** - * A {@link ChannelBuffer} which is exposed as an {@link InputStream} which makes it very + * A {@link ByteBuf} which is exposed as an {@link InputStream} which makes it very * easy to use by Camel and other Camel components. Also supported is {@link StreamCache} * which allows the data to be re-read for example when doing content based routing with XPath. */ @@ -88,6 +87,11 @@ public final class NettyChannelBufferStreamCache extends InputStream implements } @Override + public StreamCache copy() throws IOException { + return new NettyChannelBufferStreamCache(buffer.copy()); + } + + @Override public boolean inMemory() { return true; }