Updated Branches: refs/heads/master b026ba010 -> 65ed7ff30
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/65ed7ff3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/65ed7ff3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/65ed7ff3 Branch: refs/heads/master Commit: 65ed7ff30e75cfa65ac2c17f967950bbc5af3944 Parents: b026ba0 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jul 17 18:56:53 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jul 17 18:56:53 2013 +0200 ---------------------------------------------------------------------- .../mbean/ManagedStreamCachingStrategyMBean.java | 3 +++ .../org/apache/camel/impl/DefaultCamelContext.java | 10 +++++++--- .../camel/impl/DefaultStreamCachingStrategy.java | 14 ++++++++++++++ .../mbean/ManagedStreamCachingStrategy.java | 4 ++++ .../org/apache/camel/spi/StreamCachingStrategy.java | 9 +++++++++ .../management/ManagedStreamCachingStrategyTest.java | 4 +++- .../org/apache/camel/karaf/commands/ContextInfo.java | 10 ++++++++++ 7 files changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java index 16e8918..4ce4583 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java @@ -20,6 +20,9 @@ import org.apache.camel.api.management.ManagedAttribute; public interface ManagedStreamCachingStrategyMBean { + @ManagedAttribute(description = "Whether stream caching is enabled") + boolean isEnabled(); + @ManagedAttribute(description = "Directory used when overflow and spooling to disk") String getSpoolDirectory(); http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index b977979..e431f3f 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -196,7 +196,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private FactoryFinderResolver factoryFinderResolver = new DefaultFactoryFinderResolver(); private FactoryFinder defaultFactoryFinder; private PropertiesComponent propertiesComponent; - private StreamCachingStrategy streamCachingStrategy = new DefaultStreamCachingStrategy(); + private StreamCachingStrategy streamCachingStrategy; private final Map<String, FactoryFinder> factories = new HashMap<String, FactoryFinder>(); private final Map<String, RouteService> routeServices = new LinkedHashMap<String, RouteService>(); private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<String, RouteService>(); @@ -1677,7 +1677,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } if (streamCachingInUse) { // stream caching is in use so enable the strategy - addService(streamCachingStrategy); + getStreamCachingStrategy().setEnabled(true); + addService(getStreamCachingStrategy()); } // start routes @@ -2179,7 +2180,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // but only add if we haven't already registered it before (we dont want to double add when restarting) boolean found = false; for (RouteStartupOrder other : routeStartupOrder) { - if (other.getRoute().getId() == route.getId()) { + if (other.getRoute().getId().equals(route.getId())) { found = true; break; } @@ -2660,6 +2661,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } public StreamCachingStrategy getStreamCachingStrategy() { + if (streamCachingStrategy == null) { + streamCachingStrategy = new DefaultStreamCachingStrategy(); + } return streamCachingStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java index d81c244..a19af7e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java @@ -47,6 +47,7 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class); private CamelContext camelContext; + private boolean enabled; private File spoolDirectory; private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; private String spoolChiper; @@ -61,6 +62,14 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi this.camelContext = camelContext; } + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + public void setSpoolDirectory(String path) { this.spoolDirectory = new File(path); } @@ -107,6 +116,11 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi @Override protected void doStart() throws Exception { + if (!enabled) { + LOG.info("StreamCaching is not enabled"); + return; + } + String bufferSize = camelContext.getProperty(BUFFER_SIZE); String hold = camelContext.getProperty(THRESHOLD); String chiper = camelContext.getProperty(CIPHER_TRANSFORMATION); http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java index 26f1a19..7a7d511 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java @@ -46,6 +46,10 @@ public class ManagedStreamCachingStrategy extends ManagedService implements Mana return streamCachingStrategy; } + public boolean isEnabled() { + return streamCachingStrategy.isEnabled(); + } + public String getSpoolDirectory() { return streamCachingStrategy.getSpoolDirectory().getPath(); } http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 797ee8f..6b0228b 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -26,6 +26,15 @@ import org.apache.camel.Service; public interface StreamCachingStrategy extends Service { /** + * Sets whether the stream caching is enabled. + * <p/> + * <b>Notice:</b> This cannot be changed at runtime. + */ + void setEnabled(boolean enabled); + + boolean isEnabled(); + + /** * Sets the spool (temporary) directory to use for overflow and spooling to disk. * <p/> * If no spool directory has been explicit configured, then a temporary directory http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java index 8823287..b80b0b1 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java @@ -45,7 +45,9 @@ public class ManagedStreamCachingStrategyTest extends ManagementTestSupport { } assertNotNull("Cannot find DefaultStreamCachingStrategy", name); - // is disabled by default + Boolean enabled = (Boolean) mbeanServer.getAttribute(name, "Enabled"); + assertEquals(Boolean.TRUE, enabled); + String dir = (String) mbeanServer.getAttribute(name, "SpoolDirectory"); assertEquals("target/cachedir", dir); http://git-wip-us.apache.org/repos/asf/camel/blob/65ed7ff3/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java ---------------------------------------------------------------------- diff --git a/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java b/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java index 493b8b2..1098a82 100644 --- a/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java +++ b/platforms/karaf/commands/src/main/java/org/apache/camel/karaf/commands/ContextInfo.java @@ -144,6 +144,16 @@ public class ContextInfo extends OsgiCommandSupport { camelContext.getTypeConverterRegistry().getStatistics().getFailedCounter()))); } + // add stream caching details if enabled + if (camelContext.getStreamCachingStrategy().isEnabled()) { + System.out.println(StringEscapeUtils.unescapeJava(String.format("\tStreamCachingStrategy: [spoolDirectory=%s, spoolThreshold=%s, spoolChiper=%s, bufferSize=%s, removeSpoolDirectoryWhenStopping=%s]", + camelContext.getStreamCachingStrategy().getSpoolDirectory(), + camelContext.getStreamCachingStrategy().getSpoolThreshold(), + camelContext.getStreamCachingStrategy().getSpoolChiper(), + camelContext.getStreamCachingStrategy().getBufferSize(), + camelContext.getStreamCachingStrategy().isRemoveSpoolDirectoryWhenStopping()))); + } + long activeRoutes = 0; long inactiveRoutes = 0; List<Route> routeList = camelContext.getRoutes();