This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 810e133ee20 CAMEL-19749: Variables should reset stream cache before use so they are more easy to use for end users. 810e133ee20 is described below commit 810e133ee2023e0678ba098ceba3d6cfabb60920 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jan 19 10:23:13 2024 +0100 CAMEL-19749: Variables should reset stream cache before use so they are more easy to use for end users. --- .../org/apache/camel/spi/VariableRepository.java | 6 +- .../camel/processor/StreamCachingVariableTest.java | 69 ++++++++++++++++++++++ .../camel/support/ExchangeVariableRepository.java | 8 ++- .../camel/support/GlobalVariableRepository.java | 8 ++- 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java b/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java index 91d78bfa52f..153b3502135 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java @@ -29,7 +29,11 @@ public interface VariableRepository extends StaticService { String getId(); /** - * Returns a variable by name + * Returns a variable by name. + * + * If the variable is of type {@link org.apache.camel.StreamCache} then the repository should ensure to reset the + * stream cache before returning the value, to ensure the content can be read by the Camel end user and would be + * re-readable next time. * * @param name the name of the variable * @return the value of the given variable or <tt>null</tt> if there is no variable for the given name diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingVariableTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingVariableTest.java new file mode 100644 index 00000000000..c38a2256230 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingVariableTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.File; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.converter.stream.FileInputStreamCache; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class StreamCachingVariableTest extends ContextTestSupport { + + private static final String TEST_FILE = "src/test/resources/org/apache/camel/util/foo.txt"; + + @Test + public void testStreamCaching() throws Exception { + // exchange scoped + getMockEndpoint("mock:result").expectedBodiesReceived("foo"); + File file = new File(TEST_FILE); + FileInputStreamCache cache = new FileInputStreamCache(file); + template.sendBody("direct:start", cache); + assertMockEndpointsSatisfied(); + + // global scoped + cache = new FileInputStreamCache(file); + context.setVariable("myKey2", cache); + + String data = context.getVariable("myKey2", String.class); + Assertions.assertEquals("foo", data); + data = context.getVariable("myKey2", String.class); + Assertions.assertEquals("foo", data); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .setVariable("myKey", simple("${body}")) + .process(e -> { + String data = e.getVariable("myKey", String.class); + Assertions.assertEquals("foo", data); + }) + .process(e -> { + String data = e.getVariable("myKey", String.class); + Assertions.assertEquals("foo", data); + }) + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java index 3e0a3fd3192..14f74be0760 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java @@ -22,6 +22,7 @@ import java.util.stream.Stream; import org.apache.camel.Exchange; import org.apache.camel.NonManagedService; +import org.apache.camel.StreamCache; import org.apache.camel.spi.BrowsableVariableRepository; import org.apache.camel.spi.VariableRepository; import org.apache.camel.support.service.ServiceSupport; @@ -40,7 +41,12 @@ class ExchangeVariableRepository extends ServiceSupport implements BrowsableVari @Override public Object getVariable(String name) { - return variables.get(name); + Object answer = variables.get(name); + if (answer instanceof StreamCache sc) { + // reset so the cache is ready to be used as a variable + sc.reset(); + } + return answer; } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java b/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java index daad901fde1..09f1e7d7270 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; +import org.apache.camel.StreamCache; import org.apache.camel.spi.BrowsableVariableRepository; import org.apache.camel.spi.VariableRepository; import org.apache.camel.support.service.ServiceSupport; @@ -39,7 +40,12 @@ public final class GlobalVariableRepository extends ServiceSupport implements Br @Override public Object getVariable(String name) { - return variables.get(name); + Object answer = variables.get(name); + if (answer instanceof StreamCache sc) { + // reset so the cache is ready to be used as a variable + sc.reset(); + } + return answer; } @Override