Updated Branches: refs/heads/master 6f0d3368f -> d0235489f
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0235489 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0235489 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0235489 Branch: refs/heads/master Commit: d0235489fd77031c70501d6e859807ae72117034 Parents: 6f0d336 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jul 17 16:05:42 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jul 17 17:11:28 2013 +0200 ---------------------------------------------------------------------- .../converter/stream/CachedOutputStream.java | 49 ++++----- .../apache/camel/impl/DefaultCamelContext.java | 6 -- .../impl/DefaultStreamCachingStrategy.java | 100 ++++++++++++++++++- .../apache/camel/spi/StreamCachingStrategy.java | 46 +++++++++ .../java/org/apache/camel/util/IOHelper.java | 5 +- .../stream/CachedOutputStreamTest.java | 56 ++++++++--- .../stream/StreamCacheConverterTest.java | 26 +++-- .../processor/SplitterStreamCacheTest.java | 4 +- .../xml/AbstractCamelContextFactoryBean.java | 6 ++ 9 files changed, 231 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java index f12a111..1b7f90b 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java @@ -31,6 +31,7 @@ import javax.crypto.CipherOutputStream; import org.apache.camel.Exchange; import org.apache.camel.StreamCache; +import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; @@ -50,22 +51,22 @@ import org.slf4j.LoggerFactory; * fileInputStream is closed after the exchange is completed. */ public class CachedOutputStream extends OutputStream { + @Deprecated public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; + @Deprecated public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize"; + @Deprecated public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; + @Deprecated public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation"; private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class); + private final StreamCachingStrategy strategy; private OutputStream currentStream; private boolean inMemory = true; private int totalLength; private File tempFile; private FileInputStreamCache fileInputStreamCache; - - private long threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; - private int bufferSize = 2 * 1024; - private File outputDir; - private String cipherTransformation; private CipherPair ciphers; public CachedOutputStream(Exchange exchange) { @@ -73,25 +74,8 @@ public class CachedOutputStream extends OutputStream { } public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) { - // TODO: these options should be on StreamCachingStrategy - String bufferSize = exchange.getContext().getProperty(BUFFER_SIZE); - String hold = exchange.getContext().getProperty(THRESHOLD); - String dir = exchange.getContext().getProperty(TEMP_DIR); - - if (bufferSize != null) { - this.bufferSize = exchange.getContext().getTypeConverter().convertTo(Integer.class, bufferSize); - } - if (hold != null) { - this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold); - } - if (dir != null) { - this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir); - } else { - this.outputDir = exchange.getContext().getStreamCachingStrategy().getTemporaryDirectory(); - } - this.cipherTransformation = exchange.getContext().getProperty(CIPHER_TRANSFORMATION); - - currentStream = new ByteArrayOutputStream(this.bufferSize); + this.strategy = exchange.getContext().getStreamCachingStrategy(); + currentStream = new ByteArrayOutputStream(strategy.getBufferSize()); if (closedOnCompletion) { // add on completion so we can cleanup after the exchange is done such as deleting temporary files @@ -143,7 +127,7 @@ public class CachedOutputStream extends OutputStream { public void write(byte[] b, int off, int len) throws IOException { this.totalLength += len; - if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { + if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) { pageToFileStream(); } currentStream.write(b, off, len); @@ -151,7 +135,7 @@ public class CachedOutputStream extends OutputStream { public void write(byte[] b) throws IOException { this.totalLength += b.length; - if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { + if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) { pageToFileStream(); } currentStream.write(b); @@ -159,7 +143,7 @@ public class CachedOutputStream extends OutputStream { public void write(int b) throws IOException { this.totalLength++; - if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { + if (strategy.getSpoolThreshold() > 0 && inMemory && totalLength > strategy.getSpoolThreshold() && currentStream instanceof ByteArrayOutputStream) { pageToFileStream(); } currentStream.write(b); @@ -224,7 +208,7 @@ public class CachedOutputStream extends OutputStream { flush(); ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; - tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir); + tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getTemporaryDirectory()); LOG.trace("Creating temporary stream cache file: {}", tempFile); @@ -236,9 +220,10 @@ public class CachedOutputStream extends OutputStream { inMemory = false; } } - + + @Deprecated public int getBufferSize() { - return bufferSize; + return strategy.getBufferSize(); } // This class will close the CachedOutputStream when it is closed @@ -275,10 +260,10 @@ public class CachedOutputStream extends OutputStream { private OutputStream createOutputStream(File file) throws IOException { OutputStream out = new BufferedOutputStream(new FileOutputStream(file)); - if (ObjectHelper.isNotEmpty(cipherTransformation)) { + if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) { try { if (ciphers == null) { - ciphers = new CipherPair(cipherTransformation); + ciphers = new CipherPair(strategy.getSpoolChiper()); } } catch (GeneralSecurityException e) { throw new IOException(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index e4017be..b977979 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -1676,12 +1676,6 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } } if (streamCachingInUse) { - Long threshold = CamelContextHelper.convertTo(this, Long.class, getProperties().get("CamelCachedOutputStreamThreshold")); - if (threshold == null) { - threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; - } - log.info("Stream caching is enabled, and using {} kb as threshold for overflow and spooling to disk store.", threshold / 1024); - // stream caching is in use so enable the strategy addService(streamCachingStrategy); } http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java index 3333d77..d5f558a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java @@ -18,19 +18,52 @@ package org.apache.camel.impl; import java.io.File; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.StreamCache; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements StreamCachingStrategy { +/** + * Default implementation of {@link StreamCachingStrategy} + */ +public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements CamelContextAware, StreamCachingStrategy { - // TODO: Add options to configure more stuff like overflow size et all // TODO: Add JMX management // TODO: Maybe use #syntax# for default temp dir so ppl can easily configure this + @Deprecated + public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; + @Deprecated + public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize"; + @Deprecated + public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; + @Deprecated + public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation"; + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class); + + private CamelContext camelContext; private File temporaryDirectory; + private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; + private String spoolChiper; + private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE; + private boolean removeTemporaryDirectoryWhenStopping = true; + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public void setTemporaryDirectory(String path) { + this.temporaryDirectory = new File(path); + } public void setTemporaryDirectory(File path) { this.temporaryDirectory = path; @@ -40,8 +73,60 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi return temporaryDirectory; } + public long getSpoolThreshold() { + return spoolThreshold; + } + + public void setSpoolThreshold(long spoolThreshold) { + this.spoolThreshold = spoolThreshold; + } + + public String getSpoolChiper() { + return spoolChiper; + } + + public void setSpoolChiper(String spoolChiper) { + this.spoolChiper = spoolChiper; + } + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } + + public boolean isRemoveTemporaryDirectoryWhenStopping() { + return removeTemporaryDirectoryWhenStopping; + } + + public void setRemoveTemporaryDirectoryWhenStopping(boolean removeTemporaryDirectoryWhenStopping) { + this.removeTemporaryDirectoryWhenStopping = removeTemporaryDirectoryWhenStopping; + } + @Override protected void doStart() throws Exception { + String bufferSize = camelContext.getProperty(BUFFER_SIZE); + String hold = camelContext.getProperty(THRESHOLD); + String chiper = camelContext.getProperty(CIPHER_TRANSFORMATION); + String dir = camelContext.getProperty(TEMP_DIR); + + if (bufferSize != null) { + this.bufferSize = camelContext.getTypeConverter().convertTo(Integer.class, bufferSize); + } + if (hold != null) { + this.spoolThreshold = camelContext.getTypeConverter().convertTo(Long.class, hold); + } + if (chiper != null) { + this.spoolChiper = chiper; + } + if (dir != null) { + this.temporaryDirectory = camelContext.getTypeConverter().convertTo(File.class, dir); + } + + LOG.info("StreamCaching in use with {}", this.toString()); + // create random temporary directory if none has been created if (temporaryDirectory == null) { temporaryDirectory = FileUtil.createNewTempDir(); @@ -60,9 +145,18 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi @Override protected void doStop() throws Exception { - if (temporaryDirectory != null) { + if (temporaryDirectory != null && isRemoveTemporaryDirectoryWhenStopping()) { LOG.info("Removing temporary directory {}", temporaryDirectory); FileUtil.removeDir(temporaryDirectory); } } + + @Override + public String toString() { + return "DefaultStreamCachingStrategy[" + + "temporaryDirectory=" + temporaryDirectory + + ", spoolThreshold=" + spoolThreshold + + ", spoolChiper=" + spoolChiper + + ", bufferSize=" + bufferSize + "]"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index cc5ad2e..a5cfa1b 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -23,7 +23,53 @@ import java.io.File; */ public interface StreamCachingStrategy { + /** + * Sets the temporary directory to use for overflow and spooling to disk. + * <p/> + * If no temporary directory has been explicit configured, then a directory + * is created in the <tt>java.io.tmpdir</tt> directory. + */ void setTemporaryDirectory(File path); File getTemporaryDirectory(); + + void setTemporaryDirectory(String path); + + /** + * Threshold in bytes when overflow to disk is activated. + * <p/> + * The default threshold is {@link org.apache.camel.StreamCache#DEFAULT_SPOOL_THRESHOLD} bytes (eg 128kb). + * Use <tt>-1</tt> to disable overflow to disk. + */ + void setSpoolThreshold(long threshold); + + long getSpoolThreshold(); + + /** + * Sets the buffer size to use when copying between buffers. + * <p/> + * The default size is {@link org.apache.camel.util.IOHelper#DEFAULT_BUFFER_SIZE} + */ + void setBufferSize(int bufferSize); + + int getBufferSize(); + + /** + * Sets a chiper name to use when spooling to disk to write with encryption. + * <p/> + * By default the data is not encrypted. + */ + void setSpoolChiper(String chiper); + + String getSpoolChiper(); + + /** + * Whether to remove the temporary directory when stopping. + * <p/> + * This option is default <tt>true</tt> + */ + void setRemoveTemporaryDirectoryWhenStopping(boolean remove); + + boolean isRemoveTemporaryDirectoryWhenStopping(); + } http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/main/java/org/apache/camel/util/IOHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java index c381e90..76d1e5f 100644 --- a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java @@ -44,9 +44,10 @@ import org.slf4j.LoggerFactory; * @version */ public final class IOHelper { - + + public static final int DEFAULT_BUFFER_SIZE = 1024 * 4; + private static final transient Logger LOG = LoggerFactory.getLogger(IOHelper.class); - private static final int DEFAULT_BUFFER_SIZE = 1024 * 4; private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); private IOHelper() { http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java index 46316f6..91729f1 100644 --- a/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java +++ b/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.StreamCache; @@ -38,11 +39,18 @@ public class CachedOutputStreamTest extends ContextTestSupport { private Exchange exchange; + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.setStreamCaching(true); + context.getStreamCachingStrategy().setTemporaryDirectory("target/cachedir"); + context.getStreamCachingStrategy().setSpoolThreshold(16); + return context; + } + protected void setUp() throws Exception { super.setUp(); - - context.getProperties().put(CachedOutputStream.TEMP_DIR, "target/cachedir"); - context.getProperties().put(CachedOutputStream.THRESHOLD, "16"); + deleteDirectory("target/cachedir"); createDirectory("target/cachedir"); @@ -51,6 +59,11 @@ public class CachedOutputStreamTest extends ContextTestSupport { exchange.setUnitOfWork(uow); } + @Override + public boolean isUseRouteBuilder() { + return false; + } + private static String toString(InputStream input) throws IOException { BufferedReader reader = IOHelper.buffered(new InputStreamReader(input)); CollectionStringBuffer builder = new CollectionStringBuffer(); @@ -63,7 +76,9 @@ public class CachedOutputStreamTest extends ContextTestSupport { } } - public void testCacheStreamToFileAndCloseStream() throws IOException { + public void testCacheStreamToFileAndCloseStream() throws Exception { + context.start(); + CachedOutputStream cos = new CachedOutputStream(exchange); cos.write(TEST_STRING.getBytes("UTF-8")); @@ -96,9 +111,12 @@ public class CachedOutputStreamTest extends ContextTestSupport { IOHelper.close(cos); } - public void testCacheStreamToFileAndCloseStreamEncrypted() throws IOException { + public void testCacheStreamToFileAndCloseStreamEncrypted() throws Exception { // set some stream or 8-bit block cipher transformation name - exchange.getContext().getProperties().put(CachedOutputStream.CIPHER_TRANSFORMATION, "RC4"); + context.getStreamCachingStrategy().setSpoolChiper("RC4"); + + context.start(); + CachedOutputStream cos = new CachedOutputStream(exchange); cos.write(TEST_STRING.getBytes("UTF-8")); cos.flush(); @@ -137,7 +155,9 @@ public class CachedOutputStreamTest extends ContextTestSupport { IOHelper.close(cos); } - public void testCacheStreamToFileCloseStreamBeforeDone() throws IOException { + public void testCacheStreamToFileCloseStreamBeforeDone() throws Exception { + context.start(); + CachedOutputStream cos = new CachedOutputStream(exchange); cos.write(TEST_STRING.getBytes("UTF-8")); @@ -163,8 +183,10 @@ public class CachedOutputStreamTest extends ContextTestSupport { IOHelper.close(cos); } - public void testCacheStreamToMemory() throws IOException { - context.getProperties().put(CachedOutputStream.THRESHOLD, "1024"); + public void testCacheStreamToMemory() throws Exception { + context.getStreamCachingStrategy().setSpoolThreshold(1024); + + context.start(); CachedOutputStream cos = new CachedOutputStream(exchange); cos.write(TEST_STRING.getBytes("UTF-8")); @@ -181,9 +203,11 @@ public class CachedOutputStreamTest extends ContextTestSupport { IOHelper.close(cos); } - public void testCacheStreamToMemoryAsDiskIsdisabled() throws IOException { + public void testCacheStreamToMemoryAsDiskIsDisabled() throws Exception { // -1 disables disk based cache - context.getProperties().put(CachedOutputStream.THRESHOLD, "-1"); + context.getStreamCachingStrategy().setSpoolThreshold(-1); + + context.start(); CachedOutputStream cos = new CachedOutputStream(exchange); cos.write(TEST_STRING.getBytes("UTF-8")); @@ -202,14 +226,16 @@ public class CachedOutputStreamTest extends ContextTestSupport { IOHelper.close(cos); } - public void testCachedOutputStreamCustomBufferSize() throws IOException { + public void testCachedOutputStreamCustomBufferSize() throws Exception { // double the default buffer size - context.getProperties().put(CachedOutputStream.BUFFER_SIZE, "4096"); - + context.getStreamCachingStrategy().setBufferSize(8192); + + context.start(); + CachedOutputStream cos = new CachedOutputStream(exchange); cos.write(TEST_STRING.getBytes("UTF-8")); - assertEquals("we should have a custom buffer size", cos.getBufferSize(), 4096); + assertEquals("we should have a custom buffer size", cos.getBufferSize(), 8192); // make sure things still work after custom buffer size set File file = new File("target/cachedir"); http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java b/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java index aeb710f..b792d09 100644 --- a/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java +++ b/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java @@ -19,9 +19,6 @@ package org.apache.camel.converter.stream; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - import javax.xml.transform.Source; import javax.xml.transform.sax.SAXSource; import javax.xml.transform.stream.StreamSource; @@ -51,6 +48,8 @@ public class StreamCacheConverterTest extends ContextTestSupport { } public void testConvertToStreamCache() throws Exception { + context.start(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(MESSAGE.getBytes()); StreamCache streamCache = StreamCacheConverter.convertToStreamCache(new SAXSource(new InputSource(inputStream)), exchange); String message = exchange.getContext().getTypeConverter().convertTo(String.class, streamCache); @@ -59,6 +58,8 @@ public class StreamCacheConverterTest extends ContextTestSupport { } public void testConvertToStreamCacheStreamSource() throws Exception { + context.start(); + StreamSource source = new StreamSource(getTestFileStream()); StreamCache cache = StreamCacheConverter.convertToStreamCache(source, exchange); //assert re-readability of the cached StreamSource @@ -69,6 +70,8 @@ public class StreamCacheConverterTest extends ContextTestSupport { } public void testConvertToStreamCacheInputStream() throws Exception { + context.start(); + InputStream is = getTestFileStream(); InputStream cache = (InputStream)StreamCacheConverter.convertToStreamCache(is, exchange); //assert re-readability of the cached InputStream @@ -77,10 +80,10 @@ public class StreamCacheConverterTest extends ContextTestSupport { } public void testConvertToStreamCacheInputStreamWithFileCache() throws Exception { - // set up the properties - Map<String, String> properties = new HashMap<String, String>(); - properties.put(CachedOutputStream.THRESHOLD, "1"); - exchange.getContext().setProperties(properties); + exchange.getContext().getStreamCachingStrategy().setSpoolThreshold(1); + + context.start(); + InputStream is = getTestFileStream(); InputStream cache = (InputStream)StreamCacheConverter.convertToStreamCache(is, exchange); assertNotNull(IOConverter.toString(cache, null)); @@ -96,6 +99,8 @@ public class StreamCacheConverterTest extends ContextTestSupport { } public void testConvertToSerializable() throws Exception { + context.start(); + InputStream is = getTestFileStream(); StreamCache cache = StreamCacheConverter.convertToStreamCache(is, exchange); Serializable ser = StreamCacheConverter.convertToSerializable(cache, exchange); @@ -103,6 +108,8 @@ public class StreamCacheConverterTest extends ContextTestSupport { } public void testConvertToByteArray() throws Exception { + context.start(); + InputStream is = getTestFileStream(); StreamCache cache = StreamCacheConverter.convertToStreamCache(is, exchange); byte[] bytes = StreamCacheConverter.convertToByteArray(cache, exchange); @@ -114,4 +121,9 @@ public class StreamCacheConverterTest extends ContextTestSupport { assertNotNull("Should have found the file: " + TEST_FILE + " on the classpath", answer); return answer; } + + @Override + public boolean isUseRouteBuilder() { + return false; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java index 9f71441..d17410e 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java @@ -48,8 +48,8 @@ public class SplitterStreamCacheTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { //ensure stream is spooled to disk - getContext().getProperties().put(CachedOutputStream.TEMP_DIR, "target/tmp"); - getContext().getProperties().put(CachedOutputStream.THRESHOLD, "1"); + context.getStreamCachingStrategy().setTemporaryDirectory("target/tmp"); + context.getStreamCachingStrategy().setSpoolThreshold(-1); from("seda:parallel?concurrentConsumers=5").streamCaching() .split(XPathBuilder.xpath("//person/city")) http://git-wip-us.apache.org/repos/asf/camel/blob/d0235489/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index d521e7d..1454233 100644 --- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -79,6 +79,7 @@ import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanFilter; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.ShutdownStrategy; +import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.spi.UuidGenerator; @@ -789,5 +790,10 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex LOG.info("Using custom NodeIdFactory: " + nodeIdFactory); getContext().setNodeIdFactory(nodeIdFactory); } + StreamCachingStrategy streamCachingStrategy = getBeanForType(StreamCachingStrategy.class); + if (streamCachingStrategy != null) { + LOG.info("Using custom StreamCachingStrategy: " + streamCachingStrategy); + getContext().setStreamCachingStrategy(streamCachingStrategy); + } } }