Updated Branches: refs/heads/master 3d7076cef -> 5369bf1ca
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b11f4e71 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b11f4e71 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b11f4e71 Branch: refs/heads/master Commit: b11f4e7144fb9c9ce24cfb9d74640e920d1c786f Parents: 3d7076c Author: Claus Ibsen <[email protected]> Authored: Mon Jul 22 15:11:22 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Jul 22 15:11:22 2013 +0200 ---------------------------------------------------------------------- .../ManagedStreamCachingStrategyMBean.java | 6 + .../apache/camel/impl/DefaultCamelContext.java | 4 + .../impl/DefaultStreamCachingStrategy.java | 139 +++++++++++++++++-- .../mbean/ManagedStreamCachingStrategy.java | 8 ++ .../apache/camel/spi/StreamCachingStrategy.java | 44 ++++++ .../StreamCachingCustomShouldSpoolTaskTest.java | 118 ++++++++++++++++ 6 files changed, 305 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java index dd52a93..ea49ec5 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java @@ -48,6 +48,12 @@ public interface ManagedStreamCachingStrategyMBean { @ManagedAttribute(description = "Whether to remove spool directory when stopping") boolean isRemoveSpoolDirectoryWhenStopping(); + @ManagedAttribute(description = "Whether any or all should spool tasks determines should spool") + void setAnySpoolTasks(boolean any); + + @ManagedAttribute(description = "Whether any or all should spool tasks determines should spool") + boolean isAnySpoolTasks(); + @ManagedAttribute(description = "Number of in-memory StreamCache created") long getCacheMemoryCounter(); http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 1d92e4f..a75af0e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -1675,6 +1675,10 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // stream caching is in use so enable the strategy getStreamCachingStrategy().setEnabled(true); addService(getStreamCachingStrategy()); + } else { + // log if stream caching is not in use as this can help people to enable it if they use streams + log.info("StreamCaching is not in use. If using streams then its recommended to enable stream caching." + + " See more details at http://camel.apache.org/stream-caching.html"); } // start routes http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java index 15dd854..ddacc5d 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java @@ -17,6 +17,10 @@ package org.apache.camel.impl; import java.io.File; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.UUID; import org.apache.camel.CamelContext; @@ -35,7 +39,7 @@ import org.slf4j.LoggerFactory; */ public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements CamelContextAware, StreamCachingStrategy { - // TODO: add memory based watermarks for spool to disk + // TODO: add easy configuration in XML to add custom should spool tasks @Deprecated public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; @@ -53,10 +57,13 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi private File spoolDirectory; private transient String spoolDirectoryName = "${java.io.tmpdir}camel-tmp-#uuid#"; private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; + private int spoolHeapMemoryWatermarkThreshold; private String spoolChiper; private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE; private boolean removeSpoolDirectoryWhenStopping = true; private final UtilizationStatistics statistics = new UtilizationStatistics(); + private final Set<ShouldSpoolTask> spoolTasks = new LinkedHashSet<ShouldSpoolTask>(); + private boolean anySpoolTasks; public CamelContext getCamelContext() { return camelContext; @@ -90,6 +97,14 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi return spoolThreshold; } + public int getSpoolHeapMemoryWatermarkThreshold() { + return spoolHeapMemoryWatermarkThreshold; + } + + public void setSpoolHeapMemoryWatermarkThreshold(int spoolHeapMemoryWatermarkThreshold) { + this.spoolHeapMemoryWatermarkThreshold = spoolHeapMemoryWatermarkThreshold; + } + public void setSpoolThreshold(long spoolThreshold) { this.spoolThreshold = spoolThreshold; } @@ -118,15 +133,42 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping; } + public boolean isAnySpoolTasks() { + return anySpoolTasks; + } + + public void setAnySpoolTasks(boolean anySpoolTasks) { + this.anySpoolTasks = anySpoolTasks; + } + public Statistics getStatistics() { return statistics; } public boolean shouldSpoolCache(long length) { - if (spoolThreshold > 0 && length >= spoolThreshold) { - return true; + if (spoolTasks.isEmpty()) { + return false; + } + + boolean all = true; + boolean any = false; + for (ShouldSpoolTask task : spoolTasks) { + boolean result = task.shouldSpoolCache(length); + if (!result) { + all = false; + } else { + any = true; + if (anySpoolTasks) { + // no need to check anymore + break; + } + } } - return false; + return anySpoolTasks ? any : all; + } + + public void addShouldSpoolTask(ShouldSpoolTask task) { + spoolTasks.add(task); } public StreamCache cache(Exchange exchange) { @@ -183,18 +225,24 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi LOG.warn("Configuring of StreamCaching using CamelContext properties is deprecated - use StreamCachingStrategy instead."); } + if (spoolHeapMemoryWatermarkThreshold < 0 || spoolHeapMemoryWatermarkThreshold > 100) { + throw new IllegalArgumentException("SpoolHeapMemoryWatermarkThreshold must be a value between 0 and 100, was: " + spoolHeapMemoryWatermarkThreshold); + } + // if we can overflow to disk then make sure directory exists / is created - if (spoolThreshold > 0) { + if (spoolThreshold > 0 || spoolHeapMemoryWatermarkThreshold > 0) { if (spoolDirectory == null && spoolDirectoryName == null) { throw new IllegalArgumentException("SpoolDirectory must be configured when using SpoolThreshold > 0"); } - if (spoolDirectory == null && spoolDirectoryName != null) { + if (spoolDirectory == null) { String name = resolveSpoolDirectory(spoolDirectoryName); if (name != null) { spoolDirectory = new File(name); spoolDirectoryName = null; + } else { + throw new IllegalStateException("Cannot resolve spool directory from pattern: " + spoolDirectoryName); } } @@ -211,15 +259,23 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi } else { LOG.debug("Created spool directory: {}", spoolDirectory); } + + } + + if (spoolThreshold > 0) { + spoolTasks.add(new FixedThresholdShouldSpoolTask(spoolThreshold)); + } + if (spoolHeapMemoryWatermarkThreshold > 0) { + spoolTasks.add(new UsedHeapMemoryShouldSpoolTask(spoolHeapMemoryWatermarkThreshold)); } } LOG.debug("StreamCaching configuration {}", this.toString()); - if (spoolThreshold > 0) { - LOG.info("StreamCaching in use and overflow to disk enabled when > {} bytes to directory: {}", spoolThreshold, spoolDirectory); + if (spoolDirectory != null) { + LOG.info("StreamCaching in use with spool directory: {} and thresholds: {}", spoolDirectory.getPath(), spoolTasks.toString()); } else { - LOG.info("StreamCaching in use with no overflow to disk (memory only)"); + LOG.info("StreamCaching in use with thresholds: {}", spoolTasks.toString()); } } @@ -263,11 +319,66 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi + "spoolDirectory=" + spoolDirectory + ", spoolThreshold=" + spoolThreshold + ", spoolChiper=" + spoolChiper - + ", bufferSize=" + bufferSize + "]"; + + ", bufferSize=" + bufferSize + + ", anySpoolTasks=" + anySpoolTasks + "]"; + } + + private static final class FixedThresholdShouldSpoolTask implements ShouldSpoolTask { + + private final long threshold; + + private FixedThresholdShouldSpoolTask(long threshold) { + this.threshold = threshold; + } + + public boolean shouldSpoolCache(long length) { + if (threshold > 0 && length > threshold) { + LOG.trace("Should spool cache {} > {} -> true", length, threshold); + return true; + } + return false; + } + + public String toString() { + if (threshold < 1024) { + return "Spool > " + threshold + " bytes body size"; + } else { + return "Spool > " + (threshold >> 10) + "K body size"; + } + } + } + + private final class UsedHeapMemoryShouldSpoolTask implements ShouldSpoolTask { + + private final MemoryMXBean heapUsage; + private final int spoolPercentage; + + private UsedHeapMemoryShouldSpoolTask(int spoolPercentage) { + this.spoolPercentage = spoolPercentage; + this.heapUsage = ManagementFactory.getMemoryMXBean(); + } + + public boolean shouldSpoolCache(long length) { + if (spoolPercentage > 0) { + long used = heapUsage.getHeapMemoryUsage().getUsed(); + long committed = heapUsage.getHeapMemoryUsage().getCommitted(); + long percentage = committed / used * 100; + LOG.trace("Heap memory: [used=%sK (%sK\\%), committed=%sK]", new Object[]{used >> 10, percentage, committed >> 10}); + if (percentage >= spoolPercentage) { + LOG.trace("Should spool cache {} > {} -> true", percentage, spoolPercentage); + return true; + } + } + return false; + } + + public String toString() { + return "Spool > " + spoolPercentage + "% used heap memory"; + } } /** - * Represents utilization statistics + * Represents utilization statistics. */ private final class UtilizationStatistics implements Statistics { @@ -279,13 +390,13 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi private volatile long spoolSize; private volatile long spoolAverageSize; - void updateMemory(long size) { + synchronized void updateMemory(long size) { memoryCounter++; memorySize += size; memoryAverageSize = memorySize / memoryCounter; } - void updateSpool(long size) { + synchronized void updateSpool(long size) { spoolCounter++; spoolSize += size; spoolAverageSize = spoolSize / spoolCounter; @@ -315,7 +426,7 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi return spoolAverageSize; } - public void reset() { + public synchronized void reset() { memoryCounter = 0; memorySize = 0; memoryAverageSize = 0; http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java index 7a6f81e..e831d1f 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java @@ -82,6 +82,14 @@ public class ManagedStreamCachingStrategy extends ManagedService implements Mana return streamCachingStrategy.isRemoveSpoolDirectoryWhenStopping(); } + public void setAnySpoolTasks(boolean any) { + streamCachingStrategy.setAnySpoolTasks(any); + } + + public boolean isAnySpoolTasks() { + return streamCachingStrategy.isAnySpoolTasks(); + } + public long getCacheMemoryCounter() { return streamCachingStrategy.getStatistics().getCacheMemoryCounter(); } http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 52c6fad..c44eb4d 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -81,6 +81,24 @@ public interface StreamCachingStrategy extends Service { } /** + * Task for determine if stream caching should be spooled to disk or kept in-memory. + */ + interface ShouldSpoolTask { + + /** + * Determines if the stream should be spooled or not. For example if the stream length is + * over a threshold. + * <p/> + * This allows implementations to use custom strategies to determine if spooling is needed or not. + * + * @param length the length of the stream + * @return <tt>true</tt> to spool the cache, or <tt>false</tt> to keep the cache in-memory + */ + boolean shouldSpoolCache(long length); + + } + + /** * Sets whether the stream caching is enabled. * <p/> * <b>Notice:</b> This cannot be changed at runtime. @@ -112,6 +130,15 @@ public interface StreamCachingStrategy extends Service { long getSpoolThreshold(); /** + * Sets a percentage (0-100) of used heap memory threshold to activate spooling to disk. + * + * @param percentage percentage of used heap memory. + */ + void setSpoolHeapMemoryWatermarkThreshold(int percentage); + + int getSpoolHeapMemoryWatermarkThreshold(); + + /** * Sets the buffer size to use when allocating in-memory buffers used for in-memory stream caches. * <p/> * The default size is {@link org.apache.camel.util.IOHelper#DEFAULT_BUFFER_SIZE} @@ -139,11 +166,28 @@ public interface StreamCachingStrategy extends Service { boolean isRemoveSpoolDirectoryWhenStopping(); /** + * Sets whether if just any of the {@link ShouldSpoolTask} + * returns <tt>true</tt> then {@link #shouldSpoolCache(long)} returns <tt>true</tt>. + * If this option is <tt>false</tt>, then <b>all</b> the {@link ShouldSpoolTask} must + * return <tt>true</tt>. + * <p/> + * The default value is <tt>false</tt> + */ + void setAnySpoolTasks(boolean any); + + boolean isAnySpoolTasks(); + + /** * Gets the utilization statistics. */ Statistics getStatistics(); /** + * Adds the {@link ShouldSpoolTask} to be used. + */ + void addShouldSpoolTask(ShouldSpoolTask task); + + /** * Determines if the stream should be spooled or not. For example if the stream length is * over a threshold. * <p/> http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java new file mode 100644 index 0000000..3a96ea7 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FilterInputStream; +import java.io.InputStream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.StreamCachingStrategy; + +public class StreamCachingCustomShouldSpoolTaskTest extends ContextTestSupport { + + private MyCustomShouldSpoolTask spoolTask = new MyCustomShouldSpoolTask(); + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/cachedir"); + super.setUp(); + } + + public void testByteArrayInputStream() throws Exception { + getMockEndpoint("mock:english").expectedBodiesReceived("<hello/>"); + getMockEndpoint("mock:dutch").expectedBodiesReceived("<hallo/>"); + getMockEndpoint("mock:german").expectedBodiesReceived("<hallo/>"); + getMockEndpoint("mock:french").expectedBodiesReceived("<hellos/>"); + + // need to wrap in MyInputStream as ByteArrayInputStream is optimized to just reuse in memory buffer + // and not needed to spool to disk + template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hello/>".getBytes()))); + + spoolTask.setSpool(true); + template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hallo/>".getBytes()))); + template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hellos/>".getBytes()))); + + assertMockEndpointsSatisfied(); + } + + private class MyInputStream extends FilterInputStream { + + private MyInputStream(InputStream in) { + super(in); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir"); + context.getStreamCachingStrategy().addShouldSpoolTask(spoolTask); + context.getStreamCachingStrategy().setAnySpoolTasks(true); + context.setStreamCaching(true); + + from("direct:a") + .choice() + .when(xpath("//hello")).to("mock:english") + .when(xpath("//hallo")).to("mock:dutch", "mock:german") + .otherwise().to("mock:french") + .end() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // check if spool file exists + if (spoolTask.isSpool()) { + String[] names = new File("target/cachedir").list(); + assertEquals("There should be a cached spool file", 1, names.length); + } + } + }); + + } + }; + } + + private static final class MyCustomShouldSpoolTask implements StreamCachingStrategy.ShouldSpoolTask { + + private volatile boolean spool; + + @Override + public boolean shouldSpoolCache(long length) { + return spool; + } + + public boolean isSpool() { + return spool; + } + + public void setSpool(boolean spool) { + this.spool = spool; + } + + @Override + public String toString() { + return "MyCustomShouldSpoolTask"; + } + } +} +
