CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f1f901a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f1f901a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f1f901a Branch: refs/heads/master Commit: 3f1f901a0cb6c52f8e7a10768fe1114f0a294b32 Parents: f0180ca Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jul 17 13:05:49 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jul 17 13:05:49 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 13 +++- .../converter/stream/CachedOutputStream.java | 12 ++-- .../apache/camel/impl/DefaultCamelContext.java | 13 ++++ .../impl/DefaultStreamCachingStrategy.java | 68 ++++++++++++++++++++ .../apache/camel/spi/StreamCachingStrategy.java | 29 +++++++++ .../java/org/apache/camel/util/FileUtil.java | 33 ++++++---- 6 files changed, 147 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index 9e747c5..6db32ba 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -51,6 +51,7 @@ import org.apache.camel.spi.Registry; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ShutdownStrategy; +import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.TypeConverterRegistry; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.util.LoadPropertiesException; @@ -1207,9 +1208,19 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { */ Map<String, Properties> findComponents() throws LoadPropertiesException, IOException; - /** * Returns the HTML documentation for the given camel component */ String getComponentDocumentation(String componentName) throws IOException; + + /** + * Gets the {@link StreamCachingStrategy} to use. + */ + StreamCachingStrategy getStreamCachingStrategy(); + + /** + * Sets a custom {@link StreamCachingStrategy} to use. + */ + void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy); + } http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java index a2cf0a1..f12a111 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java @@ -55,7 +55,7 @@ public class CachedOutputStream extends OutputStream { public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation"; private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class); - + private OutputStream currentStream; private boolean inMemory = true; private int totalLength; @@ -68,12 +68,12 @@ public class CachedOutputStream extends OutputStream { private String cipherTransformation; private CipherPair ciphers; - public CachedOutputStream(Exchange exchange) { this(exchange, true); } public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) { + // TODO: these options should be on StreamCachingStrategy String bufferSize = exchange.getContext().getProperty(BUFFER_SIZE); String hold = exchange.getContext().getProperty(THRESHOLD); String dir = exchange.getContext().getProperty(TEMP_DIR); @@ -86,6 +86,8 @@ public class CachedOutputStream extends OutputStream { } if (dir != null) { this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir); + } else { + this.outputDir = exchange.getContext().getStreamCachingStrategy().getTemporaryDirectory(); } this.cipherTransformation = exchange.getContext().getProperty(CIPHER_TRANSFORMATION); @@ -222,11 +224,7 @@ public class CachedOutputStream extends OutputStream { flush(); ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; - if (outputDir == null) { - tempFile = FileUtil.createTempFile("cos", ".tmp"); - } else { - tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir); - } + tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir); LOG.trace("Creating temporary stream cache file: {}", tempFile); http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 9063095..e4017be 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -119,6 +119,7 @@ import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ShutdownStrategy; +import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.TypeConverterRegistry; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.support.ServiceSupport; @@ -195,6 +196,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private FactoryFinderResolver factoryFinderResolver = new DefaultFactoryFinderResolver(); private FactoryFinder defaultFactoryFinder; private PropertiesComponent propertiesComponent; + private StreamCachingStrategy streamCachingStrategy = new DefaultStreamCachingStrategy(); private final Map<String, FactoryFinder> factories = new HashMap<String, FactoryFinder>(); private final Map<String, RouteService> routeServices = new LinkedHashMap<String, RouteService>(); private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<String, RouteService>(); @@ -1679,6 +1681,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; } log.info("Stream caching is enabled, and using {} kb as threshold for overflow and spooling to disk store.", threshold / 1024); + + // stream caching is in use so enable the strategy + addService(streamCachingStrategy); } // start routes @@ -2660,6 +2665,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon this.uuidGenerator = uuidGenerator; } + public StreamCachingStrategy getStreamCachingStrategy() { + return streamCachingStrategy; + } + + public void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy) { + this.streamCachingStrategy = streamCachingStrategy; + } + @Override public String getProperty(String name) { String value = getProperties().get(name); http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java new file mode 100644 index 0000000..3333d77 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.io.File; + +import org.apache.camel.spi.StreamCachingStrategy; +import org.apache.camel.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements StreamCachingStrategy { + + // TODO: Add options to configure more stuff like overflow size et all + // TODO: Add JMX management + // TODO: Maybe use #syntax# for default temp dir so ppl can easily configure this + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class); + private File temporaryDirectory; + + public void setTemporaryDirectory(File path) { + this.temporaryDirectory = path; + } + + public File getTemporaryDirectory() { + return temporaryDirectory; + } + + @Override + protected void doStart() throws Exception { + // create random temporary directory if none has been created + if (temporaryDirectory == null) { + temporaryDirectory = FileUtil.createNewTempDir(); + LOG.info("Created temporary directory {}", temporaryDirectory); + } else { + if (!temporaryDirectory.exists()) { + boolean created = temporaryDirectory.mkdirs(); + if (!created) { + LOG.warn("Cannot create temporary directory {}", temporaryDirectory); + } else { + LOG.info("Created temporary directory {}", temporaryDirectory); + } + } + } + } + + @Override + protected void doStop() throws Exception { + if (temporaryDirectory != null) { + LOG.info("Removing temporary directory {}", temporaryDirectory); + FileUtil.removeDir(temporaryDirectory); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java new file mode 100644 index 0000000..cc5ad2e --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.io.File; + +/** + * Strategy for using <a href="http://camel.apache.org/stream-caching.html">stream caching</a>. + */ +public interface StreamCachingStrategy { + + void setTemporaryDirectory(File path); + + File getTemporaryDirectory(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/FileUtil.java b/camel-core/src/main/java/org/apache/camel/util/FileUtil.java index 339a5da..22b691d 100644 --- a/camel-core/src/main/java/org/apache/camel/util/FileUtil.java +++ b/camel-core/src/main/java/org/apache/camel/util/FileUtil.java @@ -285,6 +285,24 @@ public final class FileUtil { return defaultTempDir; } + defaultTempDir = createNewTempDir(); + + // create shutdown hook to remove the temp dir + shutdownHook = new Thread() { + @Override + public void run() { + removeDir(defaultTempDir); + } + }; + Runtime.getRuntime().addShutdownHook(shutdownHook); + + return defaultTempDir; + } + + /** + * Creates a new temporary directory in the <tt>java.io.tmpdir</tt> directory. + */ + public static File createNewTempDir() { String s = System.getProperty("java.io.tmpdir"); File checkExists = new File(s); if (!checkExists.exists()) { @@ -304,18 +322,7 @@ public final class FileUtil { f = new File(s, "camel-tmp-" + x); } - defaultTempDir = f; - - // create shutdown hook to remove the temp dir - shutdownHook = new Thread() { - @Override - public void run() { - removeDir(defaultTempDir); - } - }; - Runtime.getRuntime().addShutdownHook(shutdownHook); - - return defaultTempDir; + return f; } /** @@ -332,7 +339,7 @@ public final class FileUtil { } } - private static void removeDir(File d) { + public static void removeDir(File d) { String[] list = d.list(); if (list == null) { list = new String[0];