CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to 
configure and allow 3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/323008c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/323008c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/323008c5

Branch: refs/heads/master
Commit: 323008c505794b224c6908f623c889ed8a6f9b6f
Parents: 94133b9
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Jul 20 13:52:40 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Jul 21 17:42:45 2013 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/StreamCache.java | 10 +++++++
 .../ManagedStreamCachingStrategyMBean.java      |  8 +++++-
 .../stream/ByteArrayInputStreamCache.java       |  8 ++++++
 .../converter/stream/FileInputStreamCache.java  | 10 +++++--
 .../converter/stream/InputStreamCache.java      |  3 ++
 .../camel/converter/stream/ReaderCache.java     |  7 +++--
 .../camel/converter/stream/SourceCache.java     |  7 +++++
 .../converter/stream/StreamSourceCache.java     | 11 ++++++++
 .../impl/DefaultStreamCachingStrategy.java      | 29 ++++++++++++++++----
 .../mbean/ManagedStreamCachingStrategy.java     |  8 ++++++
 .../apache/camel/spi/StreamCachingStrategy.java | 10 +++++++
 .../ManagedStreamCachingStrategyTest.java       |  6 ++++
 .../apache/camel/util/MessageHelperTest.java    |  5 ++++
 .../http/NettyChannelBufferStreamCache.java     | 10 +++++++
 .../camel/karaf/commands/ContextInfo.java       |  6 ++--
 15 files changed, 125 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/StreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java 
b/camel-core/src/main/java/org/apache/camel/StreamCache.java
index 089614a..5333bd3 100644
--- a/camel-core/src/main/java/org/apache/camel/StreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java
@@ -51,4 +51,14 @@ public interface StreamCache {
      */
     boolean inMemory();
 
+    /**
+     * Gets the length of the cached stream.
+     * <p/>
+     * The implementation may return <tt>0</tt> in cases where the length
+     * cannot be computed, or if the implementation does not support this.
+     *
+     * @return number of bytes in the cache.
+     */
+    long length();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
index 70acf3f..7cf3bf4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
@@ -53,4 +53,10 @@ public interface ManagedStreamCachingStrategyMBean {
     @ManagedAttribute(description = "Number of spooled (not in-memory) 
StreamCache created")
     long getCacheSpoolCounter();
 
-}
+    @ManagedAttribute(description = "Total accumulated number of bytes which 
has been stream cached for in-memory StreamCache")
+    long getCacheMemorySize();
+
+    @ManagedAttribute(description = "Total accumulated number of bytes which 
has been stream cached for spooled StreamCache")
+    long getCacheSpoolSize();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
index ba41c27..ba91472 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -29,8 +29,11 @@ import org.apache.camel.util.IOHelper;
  */
 public class ByteArrayInputStreamCache extends FilterInputStream implements 
StreamCache {
 
+    private final int length;
+
     public ByteArrayInputStreamCache(ByteArrayInputStream in) {
         super(in);
+        this.length = in.available();
     }
 
     public void reset() {
@@ -49,4 +52,9 @@ public class ByteArrayInputStreamCache extends 
FilterInputStream implements Stre
     public boolean inMemory() {
         return true;
     }
+
+    @Override
+    public long length() {
+        return length;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index 4494117..c794431 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -37,8 +37,9 @@ import org.apache.camel.util.IOHelper;
  */
 public final class FileInputStreamCache extends InputStream implements 
StreamCache {
     private InputStream stream;
-    private File file;
-    private CipherPair ciphers;
+    private final File file;
+    private final CipherPair ciphers;
+    private final long length;
 
     public FileInputStreamCache(File file) throws FileNotFoundException {
         this(file, null);
@@ -48,6 +49,7 @@ public final class FileInputStreamCache extends InputStream 
implements StreamCac
         this.file = file;
         this.stream = null;
         this.ciphers = ciphers;
+        this.length = file.length();
     }
     
     @Override
@@ -95,6 +97,10 @@ public final class FileInputStreamCache extends InputStream 
implements StreamCac
         return false;
     }
 
+    public long length() {
+        return length;
+    }
+
     @Override
     public int available() throws IOException {
         return getInputStream().available();

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index d808215..ff0f9ce 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -44,4 +44,7 @@ public final class InputStreamCache extends 
ByteArrayInputStream implements Stre
         return true;
     }
 
+    public long length() {
+        return count;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index c4da171..9dbcad4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -21,15 +21,12 @@ import java.io.OutputStream;
 import java.io.StringReader;
 
 import org.apache.camel.StreamCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A {@link org.apache.camel.StreamCache} for String {@link java.io.Reader}s
  */
 public class ReaderCache extends StringReader implements StreamCache {
 
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(ReaderCache.class);
     private final String data;
 
     public ReaderCache(String data) {
@@ -58,6 +55,10 @@ public class ReaderCache extends StringReader implements 
StreamCache {
         return true;
     }
 
+    public long length() {
+        return data.length();
+    }
+
     String getData() {
         return data;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java 
b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 9b2bf01..8ac7c22 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -29,9 +29,11 @@ import org.apache.camel.util.IOHelper;
 public final class SourceCache extends StringSource implements StreamCache {
 
     private static final long serialVersionUID = 1L;
+    private final int length;
 
     public SourceCache(String data) {
         super(data);
+        this.length = data.length();
     }
 
     public void reset() {
@@ -45,4 +47,9 @@ public final class SourceCache extends StringSource 
implements StreamCache {
     public boolean inMemory() {
         return true;
     }
+
+    @Override
+    public long length() {
+        return length;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index a50b8bc..caa1adc 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -93,6 +93,17 @@ public final class StreamSourceCache extends StreamSource 
implements StreamCache
         }
     }
 
+    public long length() {
+        if (streamCache != null) {
+            return streamCache.length();
+        } else if (readCache != null) {
+            return readCache.length();
+        } else {
+            // should not happen
+            return 0;
+        }
+    }
+
     @Override
     public InputStream getInputStream() {
         return stream;

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
index d9879c9..5e57f2e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
@@ -37,6 +37,7 @@ public class DefaultStreamCachingStrategy extends 
org.apache.camel.support.Servi
 
     // TODO: logic for spool to disk in this class so we can control this
     // TODO: add memory based watermarks for spool to disk
+    // TODO: add statistics on|off option and also have avg size stats
 
     @Deprecated
     public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
@@ -59,6 +60,8 @@ public class DefaultStreamCachingStrategy extends 
org.apache.camel.support.Servi
     private boolean removeSpoolDirectoryWhenStopping = true;
     private volatile long cacheMemoryCounter;
     private volatile long cacheSpoolCounter;
+    private volatile long cacheMemorySize;
+    private volatile long cacheSpoolSize;
 
     public CamelContext getCamelContext() {
         return camelContext;
@@ -128,14 +131,28 @@ public class DefaultStreamCachingStrategy extends 
org.apache.camel.support.Servi
         return cacheSpoolCounter;
     }
 
+    public long getCacheMemorySize() {
+        return cacheMemorySize;
+    }
+
+    public long getCacheSpoolSize() {
+        return cacheSpoolSize;
+    }
+
     public StreamCache cache(Exchange exchange) {
         StreamCache cache = exchange.getIn().getBody(StreamCache.class);
-        if (cache != null) {
-            if (cache.inMemory()) {
-                cacheMemoryCounter++;
-            } else {
-                cacheSpoolCounter++;
+        try {
+            if (cache != null) {
+                if (cache.inMemory()) {
+                    cacheMemoryCounter++;
+                    cacheMemorySize += cache.length();
+                } else {
+                    cacheSpoolCounter++;
+                    cacheSpoolSize += cache.length();
+                }
             }
+        } catch (Exception e) {
+            LOG.debug("Error updating cache statistics. This exception is 
ignored.", e);
         }
         return cache;
     }
@@ -242,6 +259,8 @@ public class DefaultStreamCachingStrategy extends 
org.apache.camel.support.Servi
 
         cacheMemoryCounter = 0;
         cacheSpoolCounter = 0;
+        cacheMemorySize = 0;
+        cacheSpoolSize = 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
index b9b193f..a731c6e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
@@ -89,4 +89,12 @@ public class ManagedStreamCachingStrategy extends 
ManagedService implements Mana
     public long getCacheSpoolCounter() {
         return streamCachingStrategy.getCacheSpoolCounter();
     }
+
+    public long getCacheMemorySize() {
+        return streamCachingStrategy.getCacheMemorySize();
+    }
+
+    public long getCacheSpoolSize() {
+        return streamCachingStrategy.getCacheSpoolSize();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java 
b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index 2d4721d..858b50e 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -96,6 +96,16 @@ public interface StreamCachingStrategy extends Service {
     long getCacheSpoolCounter();
 
     /**
+     * Gets the total accumulated number of bytes which has been stream cached 
for in-memory stream caches.
+     */
+    long getCacheMemorySize();
+
+    /**
+     * Gets the total accumulated number of bytes which has been stream cached 
for spooled stream caches.
+     */
+    long getCacheSpoolSize();
+
+    /**
      * Caches the body aas a {@link StreamCache}.
      *
      * @param exchange the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
index c4f45f3..47b4f0e 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
@@ -64,6 +64,12 @@ public class ManagedStreamCachingStrategyTest extends 
ManagementTestSupport {
         counter = (Long) mbeanServer.getAttribute(name, "CacheSpoolCounter");
         assertEquals(0, counter.longValue());
 
+        Long cacheSize = (Long) mbeanServer.getAttribute(name, 
"CacheMemorySize");
+        assertEquals(0, cacheSize.longValue());
+
+        cacheSize = (Long) mbeanServer.getAttribute(name, "CacheSpoolSize");
+        assertEquals(0, cacheSize.longValue());
+
         String chiper = (String) mbeanServer.getAttribute(name, "SpoolChiper");
         assertNull(chiper);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java 
b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
index 0f83c6b..4826c41 100644
--- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
+++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
@@ -66,6 +66,11 @@ public class MessageHelperTest extends TestCase {
             public boolean inMemory() {
                 return true;
             }
+
+            @Override
+            public long length() {
+                return 0;
+            }
         });
         MessageHelper.resetStreamCache(message);
         assertTrue("Should have reset the stream cache", reset.get());

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
index c5ee8ba..344e88b 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
@@ -85,4 +85,14 @@ public final class NettyChannelBufferStreamCache extends 
InputStream implements
             buffer.readerIndex(idx);
         }
     }
+
+    @Override
+    public boolean inMemory() {
+        return true;
+    }
+
+    @Override
+    public long length() {
+        return buffer.readableBytes();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/323008c5/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java
----------------------------------------------------------------------
diff --git 
a/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java
 
b/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java
index 0566617..c6b3f7f 100644
--- 
a/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java
+++ 
b/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java
@@ -153,9 +153,11 @@ public class ContextInfo extends OsgiCommandSupport {
                             
camelContext.getStreamCachingStrategy().getBufferSize(),
                             
camelContext.getStreamCachingStrategy().isRemoveSpoolDirectoryWhenStopping())));
 
-                    
System.out.println(StringEscapeUtils.unescapeJava(String.format("\t             
          [cacheMemoryCounter=%s, cacheSpoolCounter=%s]",
+                    
System.out.println(StringEscapeUtils.unescapeJava(String.format("\t             
          [cacheMemoryCounter=%s, cacheMemorySize=%s, cacheSpoolCounter=%s, 
cacheSpoolSize=%s]",
                             
camelContext.getStreamCachingStrategy().getCacheMemoryCounter(),
-                            
camelContext.getStreamCachingStrategy().getCacheSpoolCounter())));
+                            
camelContext.getStreamCachingStrategy().getCacheMemorySize(),
+                            
camelContext.getStreamCachingStrategy().getCacheSpoolCounter(),
+                            
camelContext.getStreamCachingStrategy().getCacheSpoolSize())));
                 }
 
                 long activeRoutes = 0;

Reply via email to