This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push: new 3790072c19b CAMEL-19601: StreamCaching can configure allow/deny list of classes to control what can be cached or not. 3790072c19b is described below commit 3790072c19b4bb8af28c5f06e4902db5a8691cb5 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jul 13 14:21:35 2023 +0200 CAMEL-19601: StreamCaching can configure allow/deny list of classes to control what can be cached or not. --- .../main/camel-main-configuration-metadata.json | 2 + .../processor/SpringStreamCachingStrategyTest.java | 5 + .../processor/SpringStreamCachingStrategyTest.xml | 3 +- .../apache/camel/spi/StreamCachingStrategy.java | 37 +++++ .../impl/engine/DefaultStreamCachingStrategy.java | 87 ++++++++++- .../org/apache/camel/core/xml/streamCaching.json | 2 + .../core/xml/AbstractCamelContextFactoryBean.java | 8 + .../xml/CamelStreamCachingStrategyDefinition.java | 28 ++++ .../processor/StreamCachingAllowDenyTest.java | 161 +++++++++++++++++++++ .../MainConfigurationPropertiesConfigurer.java | 12 ++ .../camel-main-configuration-metadata.json | 2 + core/camel-main/src/main/docs/main.adoc | 4 +- .../camel/main/DefaultConfigurationConfigurer.java | 2 + .../camel/main/DefaultConfigurationProperties.java | 44 ++++++ .../mbean/ManagedStreamCachingStrategyMBean.java | 6 + .../mbean/ManagedStreamCachingStrategy.java | 22 +++ 16 files changed, 422 insertions(+), 3 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json index 89497d39597..76b3094e958 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json @@ -109,8 +109,10 @@ { "name": "camel.main.startupRecorderProfile", "description": "To use a specific Java Flight Recorder profile configuration, such as default or profile. The default is default.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String", "defaultValue": "default" }, { "name": "camel.main.startupRecorderRecording", "description": "To enable Java Flight Recorder to start a recording and automatic dump the recording to disk after startup is complete. This requires that camel-jfr is on the classpath, and to enable this option.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": "false" }, { "name": "camel.main.startupSummaryLevel", "description": "Controls the level of information logged during startup (and shutdown) of CamelContext.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "object", "javaType": "org.apache.camel.StartupSummaryLevel", "defaultValue": "Default" }, + { "name": "camel.main.streamCachingAllowClasses", "description": "To filter stream caching of a given set of allowed\/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.main.streamCachingAnySpoolRules", "description": "Sets whether if just any of the org.apache.camel.spi.StreamCachingStrategy.SpoolRule rules returns true then shouldSpoolCache(long) returns true, to allow spooling to disk. If this option is false, then all the org.apache.camel.spi.StreamCachingStrategy.SpoolRule must return true. The default value is false which means that all the rules must return true.", "sourceType": "org.apache.camel.main.DefaultConfigurationProp [...] { "name": "camel.main.streamCachingBufferSize", "description": "Sets the stream caching buffer size to use when allocating in-memory buffers used for in-memory stream caches. The default size is 4096.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "integer", "javaType": "int" }, + { "name": "camel.main.streamCachingDenyClasses", "description": "To filter stream caching of a given set of allowed\/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.main.streamCachingEnabled", "description": "Sets whether stream caching is enabled or not. While stream types (like StreamSource, InputStream and Reader) are commonly used in messaging for performance reasons, they also have an important drawback: they can only be read once. In order to be able to work with message content multiple times, the stream needs to be cached. Streams are cached in memory only (by default). If streamCachingSpoolEnabled=true, then, for large [...] { "name": "camel.main.streamCachingRemoveSpoolDirectoryWhenStopping", "description": "Whether to remove stream caching temporary directory when stopping. This option is default true.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": true }, { "name": "camel.main.streamCachingSpoolCipher", "description": "Sets a stream caching cipher name to use when spooling to disk to write with encryption. By default the data is not encrypted.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String" }, diff --git a/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.java b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.java index ee65654539f..9b00b47a6b1 100644 --- a/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.java +++ b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.java @@ -34,11 +34,16 @@ public class SpringStreamCachingStrategyTest extends SpringTestSupport { @Test public void testStreamCaching() { + context.getStreamCachingStrategy().start(); + assertTrue(context.getStreamCachingStrategy().isEnabled()); assertEquals(normalizePath("target/cachedir"), normalizePath(context.getStreamCachingStrategy().getSpoolDirectory().toString())); assertEquals(Integer.valueOf(4096).intValue(), context.getStreamCachingStrategy().getBufferSize()); assertEquals(Long.valueOf(8192).longValue(), context.getStreamCachingStrategy().getSpoolThreshold()); + assertEquals("java.io.ByteArrayInputStream", + context.getStreamCachingStrategy().getAllowClasses().iterator().next().getName()); + assertEquals("java.io.Reader", context.getStreamCachingStrategy().getDenyClasses().iterator().next().getName()); } } diff --git a/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.xml b/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.xml index b1383a12cc2..7265f2db271 100644 --- a/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.xml +++ b/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/SpringStreamCachingStrategyTest.xml @@ -26,7 +26,8 @@ <camelContext streamCache="true" xmlns="http://camel.apache.org/schema/spring"> - <streamCaching id="myCacheConfig" spoolEnabled="true" bufferSize="2048" spoolDirectory="target/cachedir" spoolThreshold="8192"/> + <streamCaching id="myCacheConfig" allowClasses="java.io.ByteArrayInputStream" denyClasses="java.io.Reader" + spoolEnabled="true" bufferSize="2048" spoolDirectory="target/cachedir" spoolThreshold="8192"/> <route> <from uri="direct:c"/> diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 3f27a0c2d52..68de83dd718 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.spi; import java.io.File; +import java.util.Collection; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -115,6 +116,42 @@ public interface StreamCachingStrategy extends StaticService { boolean isEnabled(); + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. + */ + void setAllowClasses(Class<?>... classes); + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + void setAllowClasses(String names); + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. + */ + Collection<Class<?>> getAllowClasses(); + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. + */ + void setDenyClasses(Class<?>... classes); + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + void setDenyClasses(String names); + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. + */ + Collection<Class<?>> getDenyClasses(); + /** * Enables spooling to disk. * <p/> diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java index 8e6cb33a4bb..f7513cb65f2 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java @@ -19,7 +19,10 @@ package org.apache.camel.impl.engine; import java.io.File; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; +import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import org.apache.camel.CamelContext; @@ -44,6 +47,10 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came private CamelContext camelContext; private boolean enabled; + private String allowClassNames; + private String denyClassNames; + private Collection<Class<?>> allowClasses; + private Collection<Class<?>> denyClasses; private boolean spoolEnabled; private File spoolDirectory; private transient String spoolDirectoryName = "${java.io.tmpdir}/camel/camel-tmp-#uuid#"; @@ -77,6 +84,34 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came this.enabled = enabled; } + @Override + public Collection<Class<?>> getAllowClasses() { + return allowClasses; + } + + public void setAllowClasses(Class<?>... allowClasses) { + this.allowClasses = List.of(allowClasses); + } + + @Override + public void setAllowClasses(String names) { + this.allowClassNames = names; + } + + @Override + public Collection<Class<?>> getDenyClasses() { + return denyClasses; + } + + public void setDenyClasses(Class<?>... denyClasses) { + this.denyClasses = List.of(denyClasses); + } + + @Override + public void setDenyClasses(String names) { + this.denyClassNames = names; + } + @Override public boolean isSpoolEnabled() { return spoolEnabled; @@ -223,7 +258,27 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came // try convert to stream cache Object body = message.getBody(); if (body != null) { - cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body); + boolean allowed = allowClasses == null && denyClasses == null; + if (!allowed) { + Class<?> source = body.getClass(); + if (denyClasses != null && allowClasses != null) { + // deny takes precedence + allowed = !isAssignableFrom(source, denyClasses); + if (allowed) { + allowed = isAssignableFrom(source, allowClasses); + } + } else if (denyClasses != null) { + allowed = !isAssignableFrom(source, denyClasses); + } else { + allowed = isAssignableFrom(source, allowClasses); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied"); + } + } + if (allowed) { + cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body); + } } if (cache != null) { if (LOG.isTraceEnabled()) { @@ -244,6 +299,15 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came return cache; } + protected static boolean isAssignableFrom(Class<?> source, Collection<Class<?>> targets) { + for (Class<?> target : targets) { + if (target.isAssignableFrom(source)) { + return true; + } + } + return false; + } + protected String resolveSpoolDirectory(String path) { if (camelContext.getManagementNameStrategy() != null) { String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false); @@ -288,6 +352,27 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came return; } + if (allowClassNames != null) { + if (allowClasses == null) { + allowClasses = new ArrayList<>(); + } + for (String name : allowClassNames.split(",")) { + name = name.trim(); + Class<?> clazz = camelContext.getClassResolver().resolveMandatoryClass(name); + allowClasses.add(clazz); + } + } + if (denyClassNames != null) { + if (denyClasses == null) { + denyClasses = new ArrayList<>(); + } + for (String name : denyClassNames.split(",")) { + name = name.trim(); + Class<?> clazz = camelContext.getClassResolver().resolveMandatoryClass(name); + denyClasses.add(clazz); + } + } + if (spoolUsedHeapMemoryThreshold > 99) { throw new IllegalArgumentException( "SpoolHeapMemoryWatermarkThreshold must not be higher than 99, was: " + spoolUsedHeapMemoryThreshold); diff --git a/core/camel-core-xml/src/generated/resources/org/apache/camel/core/xml/streamCaching.json b/core/camel-core-xml/src/generated/resources/org/apache/camel/core/xml/streamCaching.json index 3746e4c894b..7c5eeea90b4 100644 --- a/core/camel-core-xml/src/generated/resources/org/apache/camel/core/xml/streamCaching.json +++ b/core/camel-core-xml/src/generated/resources/org/apache/camel/core/xml/streamCaching.json @@ -14,6 +14,8 @@ "properties": { "enabled": { "kind": "attribute", "displayName": "Enabled", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Sets whether stream caching is enabled or not. While stream types (like StreamSource, InputStream and Reader) are commonly used in messaging for performance reasons, they also have an important drawback: they can only be read once. In order to be able to work w [...] "spoolEnabled": { "kind": "attribute", "displayName": "Spool Enabled", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "To enable stream caching spooling to disk. This means, for large stream messages (over 128 KB by default) will be cached in a temporary file instead, and Camel will handle deleting the temporary file once the cached stream is no longer necessary. De [...] + "allowClasses": { "kind": "attribute", "displayName": "Allow Classes", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To filter stream caching of a given set of allowed\/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma." }, + "denyClasses": { "kind": "attribute", "displayName": "Deny Classes", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To filter stream caching of a given set of allowed\/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma." }, "spoolDirectory": { "kind": "attribute", "displayName": "Spool Directory", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the spool (temporary) directory to use for overflow and spooling to disk. If no spool directory has been explicit configured, then a temporary directory is created in the java.io.tmpdir directory." }, "spoolCipher": { "kind": "attribute", "displayName": "Spool Cipher", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a cipher name to use when spooling to disk to write with encryption. By default the data is not encrypted." }, "spoolThreshold": { "kind": "attribute", "displayName": "Spool Threshold", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "description": "Threshold in bytes when overflow to disk is activated. The default threshold is org.apache.camel.StreamCache#DEFAULT_SPOOL_THRESHOLD bytes (eg 128kb). Use -1 to disable overflow to disk." }, diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 44ce432a275..1a7535f1fa5 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -751,6 +751,14 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex // stream-caching is default enabled getContext().getStreamCachingStrategy().setEnabled(true); } + String allowClasses = CamelContextHelper.parseText(getContext(), streamCaching.getAllowClasses()); + if (allowClasses != null) { + getContext().getStreamCachingStrategy().setAllowClasses(allowClasses); + } + String denyClasses = CamelContextHelper.parseText(getContext(), streamCaching.getDenyClasses()); + if (denyClasses != null) { + getContext().getStreamCachingStrategy().setDenyClasses(denyClasses); + } Boolean spoolEnabled = CamelContextHelper.parseBoolean(getContext(), streamCaching.getSpoolEnabled()); if (spoolEnabled != null) { getContext().getStreamCachingStrategy().setSpoolEnabled(spoolEnabled); diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java index e0a7d37c222..9336d3140ed 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java @@ -41,6 +41,10 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType { @Metadata(defaultValue = "false", javaType = "java.lang.Boolean") private String spoolEnabled; @XmlAttribute + private String allowClasses; + @XmlAttribute + private String denyClasses; + @XmlAttribute private String spoolDirectory; @XmlAttribute private String spoolCipher; @@ -89,6 +93,30 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType { this.enabled = enabled; } + public String getAllowClasses() { + return allowClasses; + } + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + public void setAllowClasses(String allowClasses) { + this.allowClasses = allowClasses; + } + + public String getDenyClasses() { + return denyClasses; + } + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + public void setDenyClasses(String denyClasses) { + this.denyClasses = denyClasses; + } + public String getSpoolEnabled() { return spoolEnabled; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingAllowDenyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingAllowDenyTest.java new file mode 100644 index 00000000000..48b9b7a5f39 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingAllowDenyTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayInputStream; +import java.io.Reader; +import java.io.StringReader; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.StreamCache; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StreamCachingAllowDenyTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testAllow() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getStreamCachingStrategy().setAllowClasses(ByteArrayInputStream.class); + from("direct:a").to("mock:a"); + } + }); + context.start(); + + MockEndpoint a = getMockEndpoint("mock:a"); + a.expectedMessageCount(1); + template.sendBody("direct:a", new ByteArrayInputStream("Hello World".getBytes())); + assertMockEndpointsSatisfied(); + // should be converted + assertTrue(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + + assertEquals("Hello World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + + // reset + a.reset(); + a.expectedMessageCount(1); + template.sendBody("direct:a", new StringReader("Bye World")); + assertMockEndpointsSatisfied(); + // should not be converted + assertFalse(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + assertEquals("Bye World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + } + + @Test + public void testDeny() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getStreamCachingStrategy().setDenyClasses(ByteArrayInputStream.class); + from("direct:a").to("mock:a"); + } + }); + context.start(); + + MockEndpoint a = getMockEndpoint("mock:a"); + a.expectedMessageCount(1); + template.sendBody("direct:a", new ByteArrayInputStream("Hello World".getBytes())); + assertMockEndpointsSatisfied(); + // should NOT be converted + assertFalse(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + + assertEquals("Hello World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + + // reset + a.reset(); + a.expectedMessageCount(1); + template.sendBody("direct:a", new StringReader("Bye World")); + assertMockEndpointsSatisfied(); + // should be converted + assertTrue(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + assertEquals("Bye World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + } + + @Test + public void testAllowAndDeny() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getStreamCachingStrategy().setAllowClasses(ByteArrayInputStream.class); + context.getStreamCachingStrategy().setDenyClasses(Reader.class); + from("direct:a").to("mock:a"); + } + }); + context.start(); + + MockEndpoint a = getMockEndpoint("mock:a"); + a.expectedMessageCount(1); + template.sendBody("direct:a", new ByteArrayInputStream("Hello World".getBytes())); + assertMockEndpointsSatisfied(); + // should be converted + assertTrue(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + + assertEquals("Hello World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + + // reset + a.reset(); + a.expectedMessageCount(1); + template.sendBody("direct:a", new StringReader("Bye World")); + assertMockEndpointsSatisfied(); + // should not be converted + assertFalse(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + assertEquals("Bye World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + } + + @Test + public void testDualDeny() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getStreamCachingStrategy().setDenyClasses(ByteArrayInputStream.class, Reader.class); + from("direct:a").to("mock:a"); + } + }); + context.start(); + + MockEndpoint a = getMockEndpoint("mock:a"); + a.expectedMessageCount(1); + template.sendBody("direct:a", new ByteArrayInputStream("Hello World".getBytes())); + assertMockEndpointsSatisfied(); + // should NOT be converted + assertFalse(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + + assertEquals("Hello World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + + // reset + a.reset(); + a.expectedMessageCount(1); + template.sendBody("direct:a", new StringReader("Bye World")); + assertMockEndpointsSatisfied(); + // should not be converted + assertFalse(a.getReceivedExchanges().get(0).getMessage().getBody() instanceof StreamCache); + assertEquals("Bye World", a.assertExchangeReceived(0).getIn().getBody(String.class)); + } + +} diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java index ec85a870243..43077813899 100644 --- a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java +++ b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java @@ -215,10 +215,14 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "StartupRecorderRecording": target.setStartupRecorderRecording(property(camelContext, boolean.class, value)); return true; case "startupsummarylevel": case "StartupSummaryLevel": target.setStartupSummaryLevel(property(camelContext, org.apache.camel.StartupSummaryLevel.class, value)); return true; + case "streamcachingallowclasses": + case "StreamCachingAllowClasses": target.setStreamCachingAllowClasses(property(camelContext, java.lang.String.class, value)); return true; case "streamcachinganyspoolrules": case "StreamCachingAnySpoolRules": target.setStreamCachingAnySpoolRules(property(camelContext, boolean.class, value)); return true; case "streamcachingbuffersize": case "StreamCachingBufferSize": target.setStreamCachingBufferSize(property(camelContext, int.class, value)); return true; + case "streamcachingdenyclasses": + case "StreamCachingDenyClasses": target.setStreamCachingDenyClasses(property(camelContext, java.lang.String.class, value)); return true; case "streamcachingenabled": case "StreamCachingEnabled": target.setStreamCachingEnabled(property(camelContext, boolean.class, value)); return true; case "streamcachingremovespooldirectorywhenstopping": @@ -460,10 +464,14 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "StartupRecorderRecording": return boolean.class; case "startupsummarylevel": case "StartupSummaryLevel": return org.apache.camel.StartupSummaryLevel.class; + case "streamcachingallowclasses": + case "StreamCachingAllowClasses": return java.lang.String.class; case "streamcachinganyspoolrules": case "StreamCachingAnySpoolRules": return boolean.class; case "streamcachingbuffersize": case "StreamCachingBufferSize": return int.class; + case "streamcachingdenyclasses": + case "StreamCachingDenyClasses": return java.lang.String.class; case "streamcachingenabled": case "StreamCachingEnabled": return boolean.class; case "streamcachingremovespooldirectorywhenstopping": @@ -706,10 +714,14 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "StartupRecorderRecording": return target.isStartupRecorderRecording(); case "startupsummarylevel": case "StartupSummaryLevel": return target.getStartupSummaryLevel(); + case "streamcachingallowclasses": + case "StreamCachingAllowClasses": return target.getStreamCachingAllowClasses(); case "streamcachinganyspoolrules": case "StreamCachingAnySpoolRules": return target.isStreamCachingAnySpoolRules(); case "streamcachingbuffersize": case "StreamCachingBufferSize": return target.getStreamCachingBufferSize(); + case "streamcachingdenyclasses": + case "StreamCachingDenyClasses": return target.getStreamCachingDenyClasses(); case "streamcachingenabled": case "StreamCachingEnabled": return target.isStreamCachingEnabled(); case "streamcachingremovespooldirectorywhenstopping": diff --git a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json index 89497d39597..76b3094e958 100644 --- a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json +++ b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json @@ -109,8 +109,10 @@ { "name": "camel.main.startupRecorderProfile", "description": "To use a specific Java Flight Recorder profile configuration, such as default or profile. The default is default.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String", "defaultValue": "default" }, { "name": "camel.main.startupRecorderRecording", "description": "To enable Java Flight Recorder to start a recording and automatic dump the recording to disk after startup is complete. This requires that camel-jfr is on the classpath, and to enable this option.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": "false" }, { "name": "camel.main.startupSummaryLevel", "description": "Controls the level of information logged during startup (and shutdown) of CamelContext.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "object", "javaType": "org.apache.camel.StartupSummaryLevel", "defaultValue": "Default" }, + { "name": "camel.main.streamCachingAllowClasses", "description": "To filter stream caching of a given set of allowed\/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.main.streamCachingAnySpoolRules", "description": "Sets whether if just any of the org.apache.camel.spi.StreamCachingStrategy.SpoolRule rules returns true then shouldSpoolCache(long) returns true, to allow spooling to disk. If this option is false, then all the org.apache.camel.spi.StreamCachingStrategy.SpoolRule must return true. The default value is false which means that all the rules must return true.", "sourceType": "org.apache.camel.main.DefaultConfigurationProp [...] { "name": "camel.main.streamCachingBufferSize", "description": "Sets the stream caching buffer size to use when allocating in-memory buffers used for in-memory stream caches. The default size is 4096.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "integer", "javaType": "int" }, + { "name": "camel.main.streamCachingDenyClasses", "description": "To filter stream caching of a given set of allowed\/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.main.streamCachingEnabled", "description": "Sets whether stream caching is enabled or not. While stream types (like StreamSource, InputStream and Reader) are commonly used in messaging for performance reasons, they also have an important drawback: they can only be read once. In order to be able to work with message content multiple times, the stream needs to be cached. Streams are cached in memory only (by default). If streamCachingSpoolEnabled=true, then, for large [...] { "name": "camel.main.streamCachingRemoveSpoolDirectoryWhenStopping", "description": "Whether to remove stream caching temporary directory when stopping. This option is default true.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": true }, { "name": "camel.main.streamCachingSpoolCipher", "description": "Sets a stream caching cipher name to use when spooling to disk to write with encryption. By default the data is not encrypted.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String" }, diff --git a/core/camel-main/src/main/docs/main.adoc b/core/camel-main/src/main/docs/main.adoc index c6e802a04fa..d5f75297beb 100644 --- a/core/camel-main/src/main/docs/main.adoc +++ b/core/camel-main/src/main/docs/main.adoc @@ -18,7 +18,7 @@ The following tables lists all the options: // main options: START === Camel Main configurations -The camel.main supports 119 options, which are listed below. +The camel.main supports 121 options, which are listed below. [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -120,8 +120,10 @@ The camel.main supports 119 options, which are listed below. | *camel.main.startupRecorder{zwsp}Profile* | To use a specific Java Flight Recorder profile configuration, such as default or profile. The default is default. | default | String | *camel.main.startupRecorder{zwsp}Recording* | To enable Java Flight Recorder to start a recording and automatic dump the recording to disk after startup is complete. This requires that camel-jfr is on the classpath, and to enable this option. | false | boolean | *camel.main.startupSummaryLevel* | Controls the level of information logged during startup (and shutdown) of CamelContext. | Default | StartupSummaryLevel +| *camel.main.streamCachingAllow{zwsp}Classes* | To filter stream caching of a given set of allowed/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma. | | String | *camel.main.streamCachingAny{zwsp}SpoolRules* | Sets whether if just any of the org.apache.camel.spi.StreamCachingStrategy.SpoolRule rules returns true then shouldSpoolCache(long) returns true, to allow spooling to disk. If this option is false, then all the org.apache.camel.spi.StreamCachingStrategy.SpoolRule must return true. The default value is false which means that all the rules must return true. | false | boolean | *camel.main.streamCachingBuffer{zwsp}Size* | Sets the stream caching buffer size to use when allocating in-memory buffers used for in-memory stream caches. The default size is 4096. | | int +| *camel.main.streamCachingDeny{zwsp}Classes* | To filter stream caching of a given set of allowed/denied classes. By default, all classes that are java.io.InputStream is allowed. Multiple class names can be separated by comma. | | String | *camel.main.streamCaching{zwsp}Enabled* | Sets whether stream caching is enabled or not. While stream types (like StreamSource, InputStream and Reader) are commonly used in messaging for performance reasons, they also have an important drawback: they can only be read once. In order to be able to work with message content multiple times, the stream needs to be cached. Streams are cached in memory only (by default). If streamCachingSpoolEnabled=true, then, for large stream messages (over [...] | *camel.main.streamCachingRemove{zwsp}SpoolDirectoryWhenStopping* | Whether to remove stream caching temporary directory when stopping. This option is default true. | true | boolean | *camel.main.streamCachingSpool{zwsp}Cipher* | Sets a stream caching cipher name to use when spooling to disk to write with encryption. By default the data is not encrypted. | | String diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index eff98206d8d..0e7a32628a6 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -184,6 +184,8 @@ public final class DefaultConfigurationConfigurer { // stream caching camelContext.setStreamCaching(config.isStreamCachingEnabled()); + camelContext.getStreamCachingStrategy().setAllowClasses(config.getStreamCachingAllowClasses()); + camelContext.getStreamCachingStrategy().setDenyClasses(config.getStreamCachingDenyClasses()); camelContext.getStreamCachingStrategy().setSpoolEnabled(config.isStreamCachingSpoolEnabled()); camelContext.getStreamCachingStrategy().setAnySpoolRules(config.isStreamCachingAnySpoolRules()); camelContext.getStreamCachingStrategy().setBufferSize(config.getStreamCachingBufferSize()); diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java index 37bc3b82a11..ae611459929 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java @@ -59,6 +59,8 @@ public abstract class DefaultConfigurationProperties<T> { private boolean modeline; private int logDebugMaxChars; private boolean streamCachingEnabled = true; + private String streamCachingAllowClasses; + private String streamCachingDenyClasses; private boolean streamCachingSpoolEnabled; private String streamCachingSpoolDirectory; private String streamCachingSpoolCipher; @@ -463,6 +465,30 @@ public abstract class DefaultConfigurationProperties<T> { this.streamCachingEnabled = streamCachingEnabled; } + public String getStreamCachingAllowClasses() { + return streamCachingAllowClasses; + } + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + public void setStreamCachingAllowClasses(String streamCachingAllowClasses) { + this.streamCachingAllowClasses = streamCachingAllowClasses; + } + + public String getStreamCachingDenyClasses() { + return streamCachingDenyClasses; + } + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + public void setStreamCachingDenyClasses(String streamCachingDenyClasses) { + this.streamCachingDenyClasses = streamCachingDenyClasses; + } + public boolean isStreamCachingSpoolEnabled() { return streamCachingSpoolEnabled; } @@ -1854,6 +1880,24 @@ public abstract class DefaultConfigurationProperties<T> { return (T) this; } + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + public T withStreamCachingAllowClasses(String streamCachingAllowClasses) { + this.streamCachingAllowClasses = streamCachingAllowClasses; + return (T) this; + } + + /** + * To filter stream caching of a given set of allowed/denied classes. By default, all classes that are + * {@link java.io.InputStream} is allowed. Multiple class names can be separated by comma. + */ + public T withStreamCachingDenyClasses(String streamCachingDenyClasses) { + this.streamCachingDenyClasses = streamCachingDenyClasses; + return (T) this; + } + /** * To enable stream caching spooling to disk. This means, for large stream messages (over 128 KB by default) will be * cached in a temporary file instead, and Camel will handle deleting the temporary file once the cached stream is diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java index c420d1eb357..ff16b2da1c5 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java @@ -32,6 +32,12 @@ public interface ManagedStreamCachingStrategyMBean extends ManagedServiceMBean { @ManagedAttribute(description = "Whether stream caching is enabled") boolean isEnabled(); + @ManagedAttribute(description = "To filter stream caching of a given set of allowed/denied classes.") + String[] getAllowClasses(); + + @ManagedAttribute(description = "To filter stream caching of a given set of allowed/denied classes.") + String[] getDenyClasses(); + @ManagedAttribute(description = "Whether spooling to disk enabled") boolean isSpoolEnabled(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java index 7249fa4653e..74dc57d2e05 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java @@ -26,11 +26,23 @@ public class ManagedStreamCachingStrategy extends ManagedService implements Mana private final CamelContext camelContext; private final StreamCachingStrategy streamCachingStrategy; + private final String[] allowClasses; + private final String[] denyClasses; public ManagedStreamCachingStrategy(CamelContext camelContext, StreamCachingStrategy streamCachingStrategy) { super(camelContext, streamCachingStrategy); this.camelContext = camelContext; this.streamCachingStrategy = streamCachingStrategy; + if (streamCachingStrategy.getAllowClasses() != null) { + this.allowClasses = streamCachingStrategy.getAllowClasses().toArray(new String[0]); + } else { + this.allowClasses = null; + } + if (streamCachingStrategy.getDenyClasses() != null) { + this.denyClasses = streamCachingStrategy.getDenyClasses().toArray(new String[0]); + } else { + this.denyClasses = null; + } } public CamelContext getCamelContext() { @@ -46,6 +58,16 @@ public class ManagedStreamCachingStrategy extends ManagedService implements Mana return streamCachingStrategy.isEnabled(); } + @Override + public String[] getAllowClasses() { + return allowClasses; + } + + @Override + public String[] getDenyClasses() { + return denyClasses; + } + @Override public boolean isSpoolEnabled() { return streamCachingStrategy.isSpoolEnabled();