CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to 
configure and allow 3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b026ba01
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b026ba01
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b026ba01

Branch: refs/heads/master
Commit: b026ba010425435ee87a126c1b54e65ca082b3da
Parents: 1471c61
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Jul 17 18:43:07 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Jul 17 18:43:07 2013 +0200

----------------------------------------------------------------------
 .../ManagedStreamCachingStrategyMBean.java      | 47 ++++++++++++
 .../DefaultManagementLifecycleStrategy.java     |  4 +
 .../mbean/ManagedStreamCachingStrategy.java     | 80 ++++++++++++++++++++
 .../apache/camel/spi/StreamCachingStrategy.java |  4 +-
 .../ManagedStreamCachingStrategyTest.java       | 80 ++++++++++++++++++++
 5 files changed, 214 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
new file mode 100644
index 0000000..16e8918
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedStreamCachingStrategyMBean {
+
+    @ManagedAttribute(description = "Directory used when overflow and spooling 
to disk")
+    String getSpoolDirectory();
+
+    @ManagedAttribute(description = "Chiper used if writing with encryption")
+    String getSpoolChiper();
+
+    @ManagedAttribute(description = "Threshold in bytes when overflow and 
spooling to disk instead of keeping in memory")
+    void setSpoolThreshold(long threshold);
+
+    @ManagedAttribute(description = "Threshold in bytes when overflow and 
spooling to disk instead of keeping in memory")
+    long getSpoolThreshold();
+
+    @ManagedAttribute(description = "Buffer size in bytes to use when coping 
between buffers")
+    void setBufferSize(int bufferSize);
+
+    @ManagedAttribute(description = "Buffer size in bytes to use when coping 
between buffers")
+    int getBufferSize();
+
+    @ManagedAttribute(description = "Whether to remove spool directory when 
stopping")
+    void setRemoveSpoolDirectoryWhenStopping(boolean remove);
+
+    @ManagedAttribute(description = "Whether to remove spool directory when 
stopping")
+    boolean isRemoveSpoolDirectoryWhenStopping();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 9b63390..441d832 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -60,6 +60,7 @@ import 
org.apache.camel.management.mbean.ManagedEndpointRegistry;
 import org.apache.camel.management.mbean.ManagedProducerCache;
 import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
 import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedTracer;
 import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
@@ -83,6 +84,7 @@ import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementObjectStrategy;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ServiceSupport;
@@ -458,6 +460,8 @@ public class DefaultManagementLifecycleStrategy extends 
ServiceSupport implement
             answer = new ManagedEndpointRegistry(context, (EndpointRegistry) 
service);
         } else if (service instanceof TypeConverterRegistry) {
             answer = new ManagedTypeConverterRegistry(context, 
(TypeConverterRegistry) service);
+        } else if (service instanceof StreamCachingStrategy) {
+            answer = new ManagedStreamCachingStrategy(context, 
(StreamCachingStrategy) service);
         } else if (service != null) {
             // fallback as generic service
             answer = 
getManagementObjectStrategy().getManagedObjectForService(context, service);

http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
new file mode 100644
index 0000000..26f1a19
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedStreamCachingStrategyMBean;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.StreamCachingStrategy;
+
+@ManagedResource(description = "Managed StreamCachingStrategy")
+public class ManagedStreamCachingStrategy extends ManagedService implements 
ManagedStreamCachingStrategyMBean {
+
+    private final CamelContext camelContext;
+    private final StreamCachingStrategy streamCachingStrategy;
+
+    public ManagedStreamCachingStrategy(CamelContext camelContext, 
StreamCachingStrategy streamCachingStrategy) {
+        super(camelContext, streamCachingStrategy);
+        this.camelContext = camelContext;
+        this.streamCachingStrategy = streamCachingStrategy;
+    }
+
+    public void init(ManagementStrategy strategy) {
+        // do nothing
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public StreamCachingStrategy getStreamCachingStrategy() {
+        return streamCachingStrategy;
+    }
+
+    public String getSpoolDirectory() {
+        return streamCachingStrategy.getSpoolDirectory().getPath();
+    }
+
+    public String getSpoolChiper() {
+        return streamCachingStrategy.getSpoolChiper();
+    }
+
+    public void setSpoolThreshold(long threshold) {
+        streamCachingStrategy.setSpoolThreshold(threshold);
+    }
+
+    public long getSpoolThreshold() {
+        return streamCachingStrategy.getSpoolThreshold();
+    }
+
+    public void setBufferSize(int bufferSize) {
+        streamCachingStrategy.setBufferSize(bufferSize);
+    }
+
+    public int getBufferSize() {
+        return streamCachingStrategy.getBufferSize();
+    }
+
+    public void setRemoveSpoolDirectoryWhenStopping(boolean remove) {
+        streamCachingStrategy.setRemoveSpoolDirectoryWhenStopping(remove);
+    }
+
+    public boolean isRemoveSpoolDirectoryWhenStopping() {
+        return streamCachingStrategy.isRemoveSpoolDirectoryWhenStopping();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java 
b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index 217a0fc..797ee8f 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -18,10 +18,12 @@ package org.apache.camel.spi;
 
 import java.io.File;
 
+import org.apache.camel.Service;
+
 /**
  * Strategy for using <a 
href="http://camel.apache.org/stream-caching.html";>stream caching</a>.
  */
-public interface StreamCachingStrategy {
+public interface StreamCachingStrategy extends Service {
 
     /**
      * Sets the spool (temporary) directory to use for overflow and spooling 
to disk.

http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
new file mode 100644
index 0000000..8823287
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.StreamCache;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * @version 
+ */
+public class ManagedStreamCachingStrategyTest extends ManagementTestSupport {
+
+    public void testStreamCachingStrategy() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+
+        ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,*");
+
+        // number of services
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        ObjectName name = null;
+        for (ObjectName service : names) {
+            if (service.toString().contains("DefaultStreamCachingStrategy")) {
+                name = service;
+                break;
+            }
+        }
+        assertNotNull("Cannot find DefaultStreamCachingStrategy", name);
+
+        // is disabled by default
+        String dir = (String) mbeanServer.getAttribute(name, "SpoolDirectory");
+        assertEquals("target/cachedir", dir);
+
+        Long threshold = (Long) mbeanServer.getAttribute(name, 
"SpoolThreshold");
+        assertEquals(StreamCache.DEFAULT_SPOOL_THRESHOLD, 
threshold.longValue());
+
+        Integer size = (Integer) mbeanServer.getAttribute(name, "BufferSize");
+        assertEquals(IOHelper.DEFAULT_BUFFER_SIZE, size.intValue());
+
+        String chiper = (String) mbeanServer.getAttribute(name, "SpoolChiper");
+        assertNull(chiper);
+
+        Boolean remove = (Boolean) mbeanServer.getAttribute(name, 
"RemoveSpoolDirectoryWhenStopping");
+        assertEquals(Boolean.TRUE, remove);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                
context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+
+                from("direct:start").routeId("foo")
+                    .convertBodyTo(int.class)
+                    .to("mock:a");
+            }
+        };
+    }
+
+}

Reply via email to