CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b026ba01 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b026ba01 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b026ba01 Branch: refs/heads/master Commit: b026ba010425435ee87a126c1b54e65ca082b3da Parents: 1471c61 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jul 17 18:43:07 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jul 17 18:43:07 2013 +0200 ---------------------------------------------------------------------- .../ManagedStreamCachingStrategyMBean.java | 47 ++++++++++++ .../DefaultManagementLifecycleStrategy.java | 4 + .../mbean/ManagedStreamCachingStrategy.java | 80 ++++++++++++++++++++ .../apache/camel/spi/StreamCachingStrategy.java | 4 +- .../ManagedStreamCachingStrategyTest.java | 80 ++++++++++++++++++++ 5 files changed, 214 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java new file mode 100644 index 0000000..16e8918 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedStreamCachingStrategyMBean { + + @ManagedAttribute(description = "Directory used when overflow and spooling to disk") + String getSpoolDirectory(); + + @ManagedAttribute(description = "Chiper used if writing with encryption") + String getSpoolChiper(); + + @ManagedAttribute(description = "Threshold in bytes when overflow and spooling to disk instead of keeping in memory") + void setSpoolThreshold(long threshold); + + @ManagedAttribute(description = "Threshold in bytes when overflow and spooling to disk instead of keeping in memory") + long getSpoolThreshold(); + + @ManagedAttribute(description = "Buffer size in bytes to use when coping between buffers") + void setBufferSize(int bufferSize); + + @ManagedAttribute(description = "Buffer size in bytes to use when coping between buffers") + int getBufferSize(); + + @ManagedAttribute(description = "Whether to remove spool directory when stopping") + void setRemoveSpoolDirectoryWhenStopping(boolean remove); + + @ManagedAttribute(description = "Whether to remove spool directory when stopping") + boolean isRemoveSpoolDirectoryWhenStopping(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java index 9b63390..441d832 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java @@ -60,6 +60,7 @@ import org.apache.camel.management.mbean.ManagedEndpointRegistry; import org.apache.camel.management.mbean.ManagedProducerCache; import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedService; +import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedTracer; import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; @@ -83,6 +84,7 @@ import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementObjectStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.TypeConverterRegistry; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ServiceSupport; @@ -458,6 +460,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); } else if (service instanceof TypeConverterRegistry) { answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); + } else if (service instanceof StreamCachingStrategy) { + answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); } else if (service != null) { // fallback as generic service answer = getManagementObjectStrategy().getManagedObjectForService(context, service); http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java new file mode 100644 index 0000000..26f1a19 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedStreamCachingStrategyMBean; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.StreamCachingStrategy; + +@ManagedResource(description = "Managed StreamCachingStrategy") +public class ManagedStreamCachingStrategy extends ManagedService implements ManagedStreamCachingStrategyMBean { + + private final CamelContext camelContext; + private final StreamCachingStrategy streamCachingStrategy; + + public ManagedStreamCachingStrategy(CamelContext camelContext, StreamCachingStrategy streamCachingStrategy) { + super(camelContext, streamCachingStrategy); + this.camelContext = camelContext; + this.streamCachingStrategy = streamCachingStrategy; + } + + public void init(ManagementStrategy strategy) { + // do nothing + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public StreamCachingStrategy getStreamCachingStrategy() { + return streamCachingStrategy; + } + + public String getSpoolDirectory() { + return streamCachingStrategy.getSpoolDirectory().getPath(); + } + + public String getSpoolChiper() { + return streamCachingStrategy.getSpoolChiper(); + } + + public void setSpoolThreshold(long threshold) { + streamCachingStrategy.setSpoolThreshold(threshold); + } + + public long getSpoolThreshold() { + return streamCachingStrategy.getSpoolThreshold(); + } + + public void setBufferSize(int bufferSize) { + streamCachingStrategy.setBufferSize(bufferSize); + } + + public int getBufferSize() { + return streamCachingStrategy.getBufferSize(); + } + + public void setRemoveSpoolDirectoryWhenStopping(boolean remove) { + streamCachingStrategy.setRemoveSpoolDirectoryWhenStopping(remove); + } + + public boolean isRemoveSpoolDirectoryWhenStopping() { + return streamCachingStrategy.isRemoveSpoolDirectoryWhenStopping(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 217a0fc..797ee8f 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -18,10 +18,12 @@ package org.apache.camel.spi; import java.io.File; +import org.apache.camel.Service; + /** * Strategy for using <a href="http://camel.apache.org/stream-caching.html">stream caching</a>. */ -public interface StreamCachingStrategy { +public interface StreamCachingStrategy extends Service { /** * Sets the spool (temporary) directory to use for overflow and spooling to disk. http://git-wip-us.apache.org/repos/asf/camel/blob/b026ba01/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java new file mode 100644 index 0000000..8823287 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedStreamCachingStrategyTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.Set; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.StreamCache; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.IOHelper; + +/** + * @version + */ +public class ManagedStreamCachingStrategyTest extends ManagementTestSupport { + + public void testStreamCachingStrategy() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + + ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,*"); + + // number of services + Set<ObjectName> names = mbeanServer.queryNames(on, null); + ObjectName name = null; + for (ObjectName service : names) { + if (service.toString().contains("DefaultStreamCachingStrategy")) { + name = service; + break; + } + } + assertNotNull("Cannot find DefaultStreamCachingStrategy", name); + + // is disabled by default + String dir = (String) mbeanServer.getAttribute(name, "SpoolDirectory"); + assertEquals("target/cachedir", dir); + + Long threshold = (Long) mbeanServer.getAttribute(name, "SpoolThreshold"); + assertEquals(StreamCache.DEFAULT_SPOOL_THRESHOLD, threshold.longValue()); + + Integer size = (Integer) mbeanServer.getAttribute(name, "BufferSize"); + assertEquals(IOHelper.DEFAULT_BUFFER_SIZE, size.intValue()); + + String chiper = (String) mbeanServer.getAttribute(name, "SpoolChiper"); + assertNull(chiper); + + Boolean remove = (Boolean) mbeanServer.getAttribute(name, "RemoveSpoolDirectoryWhenStopping"); + assertEquals(Boolean.TRUE, remove); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setStreamCaching(true); + context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir"); + + from("direct:start").routeId("foo") + .convertBodyTo(int.class) + .to("mock:a"); + } + }; + } + +}