CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/323008c5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/323008c5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/323008c5 Branch: refs/heads/master Commit: 323008c505794b224c6908f623c889ed8a6f9b6f Parents: 94133b9 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 20 13:52:40 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 21 17:42:45 2013 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/StreamCache.java | 10 +++++++ .../ManagedStreamCachingStrategyMBean.java | 8 +++++- .../stream/ByteArrayInputStreamCache.java | 8 ++++++ .../converter/stream/FileInputStreamCache.java | 10 +++++-- .../converter/stream/InputStreamCache.java | 3 ++ .../camel/converter/stream/ReaderCache.java | 7 +++-- .../camel/converter/stream/SourceCache.java | 7 +++++ .../converter/stream/StreamSourceCache.java | 11 ++++++++ .../impl/DefaultStreamCachingStrategy.java | 29 ++++++++++++++++---- .../mbean/ManagedStreamCachingStrategy.java | 8 ++++++ .../apache/camel/spi/StreamCachingStrategy.java | 10 +++++++ .../ManagedStreamCachingStrategyTest.java | 6 ++++ .../apache/camel/util/MessageHelperTest.java | 5 ++++ .../http/NettyChannelBufferStreamCache.java | 10 +++++++ .../camel/karaf/commands/ContextInfo.java | 6 ++-- 15 files changed, 125 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/StreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java b/camel-core/src/main/java/org/apache/camel/StreamCache.java index 089614a..5333bd3 100644 --- a/camel-core/src/main/java/org/apache/camel/StreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java @@ -51,4 +51,14 @@ public interface StreamCache { */ boolean inMemory(); + /** + * Gets the length of the cached stream. + * <p/> + * The implementation may return <tt>0</tt> in cases where the length + * cannot be computed, or if the implementation does not support this. + * + * @return number of bytes in the cache. + */ + long length(); + } http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java index 70acf3f..7cf3bf4 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java @@ -53,4 +53,10 @@ public interface ManagedStreamCachingStrategyMBean { @ManagedAttribute(description = "Number of spooled (not in-memory) StreamCache created") long getCacheSpoolCounter(); -} + @ManagedAttribute(description = "Total accumulated number of bytes which has been stream cached for in-memory StreamCache") + long getCacheMemorySize(); + + @ManagedAttribute(description = "Total accumulated number of bytes which has been stream cached for spooled StreamCache") + long getCacheSpoolSize(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java index ba41c27..ba91472 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java @@ -29,8 +29,11 @@ import org.apache.camel.util.IOHelper; */ public class ByteArrayInputStreamCache extends FilterInputStream implements StreamCache { + private final int length; + public ByteArrayInputStreamCache(ByteArrayInputStream in) { super(in); + this.length = in.available(); } public void reset() { @@ -49,4 +52,9 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre public boolean inMemory() { return true; } + + @Override + public long length() { + return length; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index 4494117..c794431 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -37,8 +37,9 @@ import org.apache.camel.util.IOHelper; */ public final class FileInputStreamCache extends InputStream implements StreamCache { private InputStream stream; - private File file; - private CipherPair ciphers; + private final File file; + private final CipherPair ciphers; + private final long length; public FileInputStreamCache(File file) throws FileNotFoundException { this(file, null); @@ -48,6 +49,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac this.file = file; this.stream = null; this.ciphers = ciphers; + this.length = file.length(); } @Override @@ -95,6 +97,10 @@ public final class FileInputStreamCache extends InputStream implements StreamCac return false; } + public long length() { + return length; + } + @Override public int available() throws IOException { return getInputStream().available(); http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java index d808215..ff0f9ce 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java @@ -44,4 +44,7 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre return true; } + public long length() { + return count; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java index c4da171..9dbcad4 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java @@ -21,15 +21,12 @@ import java.io.OutputStream; import java.io.StringReader; import org.apache.camel.StreamCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s */ public class ReaderCache extends StringReader implements StreamCache { - private static final transient Logger LOG = LoggerFactory.getLogger(ReaderCache.class); private final String data; public ReaderCache(String data) { @@ -58,6 +55,10 @@ public class ReaderCache extends StringReader implements StreamCache { return true; } + public long length() { + return data.length(); + } + String getData() { return data; } http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java index 9b2bf01..8ac7c22 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java @@ -29,9 +29,11 @@ import org.apache.camel.util.IOHelper; public final class SourceCache extends StringSource implements StreamCache { private static final long serialVersionUID = 1L; + private final int length; public SourceCache(String data) { super(data); + this.length = data.length(); } public void reset() { @@ -45,4 +47,9 @@ public final class SourceCache extends StringSource implements StreamCache { public boolean inMemory() { return true; } + + @Override + public long length() { + return length; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java index a50b8bc..caa1adc 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java @@ -93,6 +93,17 @@ public final class StreamSourceCache extends StreamSource implements StreamCache } } + public long length() { + if (streamCache != null) { + return streamCache.length(); + } else if (readCache != null) { + return readCache.length(); + } else { + // should not happen + return 0; + } + } + @Override public InputStream getInputStream() { return stream; http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java index d9879c9..5e57f2e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java @@ -37,6 +37,7 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi // TODO: logic for spool to disk in this class so we can control this // TODO: add memory based watermarks for spool to disk + // TODO: add statistics on|off option and also have avg size stats @Deprecated public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; @@ -59,6 +60,8 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi private boolean removeSpoolDirectoryWhenStopping = true; private volatile long cacheMemoryCounter; private volatile long cacheSpoolCounter; + private volatile long cacheMemorySize; + private volatile long cacheSpoolSize; public CamelContext getCamelContext() { return camelContext; @@ -128,14 +131,28 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi return cacheSpoolCounter; } + public long getCacheMemorySize() { + return cacheMemorySize; + } + + public long getCacheSpoolSize() { + return cacheSpoolSize; + } + public StreamCache cache(Exchange exchange) { StreamCache cache = exchange.getIn().getBody(StreamCache.class); - if (cache != null) { - if (cache.inMemory()) { - cacheMemoryCounter++; - } else { - cacheSpoolCounter++; + try { + if (cache != null) { + if (cache.inMemory()) { + cacheMemoryCounter++; + cacheMemorySize += cache.length(); + } else { + cacheSpoolCounter++; + cacheSpoolSize += cache.length(); + } } + } catch (Exception e) { + LOG.debug("Error updating cache statistics. This exception is ignored.", e); } return cache; } @@ -242,6 +259,8 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi cacheMemoryCounter = 0; cacheSpoolCounter = 0; + cacheMemorySize = 0; + cacheSpoolSize = 0; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java index b9b193f..a731c6e 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java @@ -89,4 +89,12 @@ public class ManagedStreamCachingStrategy extends ManagedService implements Mana public long getCacheSpoolCounter() { return streamCachingStrategy.getCacheSpoolCounter(); } + + public long getCacheMemorySize() { + return streamCachingStrategy.getCacheMemorySize(); + } + + public long getCacheSpoolSize() { + return streamCachingStrategy.getCacheSpoolSize(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 2d4721d..858b50e 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -96,6 +96,16 @@ public interface StreamCachingStrategy extends Service { long getCacheSpoolCounter(); /** + * Gets the total accumulated number of bytes which has been stream cached for in-memory stream caches. + */ + long getCacheMemorySize(); + + /** + * Gets the total accumulated number of bytes which has been stream cached for spooled stream caches. + */ + long getCacheSpoolSize(); + + /** * Caches the body aas a {@link StreamCache}. * * @param exchange the exchange http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java index c4f45f3..47b4f0e 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java @@ -64,6 +64,12 @@ public class ManagedStreamCachingStrategyTest extends ManagementTestSupport { counter = (Long) mbeanServer.getAttribute(name, "CacheSpoolCounter"); assertEquals(0, counter.longValue()); + Long cacheSize = (Long) mbeanServer.getAttribute(name, "CacheMemorySize"); + assertEquals(0, cacheSize.longValue()); + + cacheSize = (Long) mbeanServer.getAttribute(name, "CacheSpoolSize"); + assertEquals(0, cacheSize.longValue()); + String chiper = (String) mbeanServer.getAttribute(name, "SpoolChiper"); assertNull(chiper); http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java index 0f83c6b..4826c41 100644 --- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java +++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java @@ -66,6 +66,11 @@ public class MessageHelperTest extends TestCase { public boolean inMemory() { return true; } + + @Override + public long length() { + return 0; + } }); MessageHelper.resetStreamCache(message); assertTrue("Should have reset the stream cache", reset.get()); http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java index c5ee8ba..344e88b 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java @@ -85,4 +85,14 @@ public final class NettyChannelBufferStreamCache extends InputStream implements buffer.readerIndex(idx); } } + + @Override + public boolean inMemory() { + return true; + } + + @Override + public long length() { + return buffer.readableBytes(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java ---------------------------------------------------------------------- diff --git a/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java b/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java index 0566617..c6b3f7f 100644 --- a/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java +++ b/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java @@ -153,9 +153,11 @@ public class ContextInfo extends OsgiCommandSupport { camelContext.getStreamCachingStrategy().getBufferSize(), camelContext.getStreamCachingStrategy().isRemoveSpoolDirectoryWhenStopping()))); - System.out.println(StringEscapeUtils.unescapeJava(String.format("\t [cacheMemoryCounter=%s, cacheSpoolCounter=%s]", + System.out.println(StringEscapeUtils.unescapeJava(String.format("\t [cacheMemoryCounter=%s, cacheMemorySize=%s, cacheSpoolCounter=%s, cacheSpoolSize=%s]", camelContext.getStreamCachingStrategy().getCacheMemoryCounter(), - camelContext.getStreamCachingStrategy().getCacheSpoolCounter()))); + camelContext.getStreamCachingStrategy().getCacheMemorySize(), + camelContext.getStreamCachingStrategy().getCacheSpoolCounter(), + camelContext.getStreamCachingStrategy().getCacheSpoolSize()))); } long activeRoutes = 0;