CAMEL-9979: Bean parameter binding should reset stream cache before evaluate message body
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a6f04063 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a6f04063 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a6f04063 Branch: refs/heads/master Commit: a6f040636f4d8959efbbaa701cd8d666df9c97b9 Parents: ca65626 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat May 21 11:50:26 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat May 21 12:21:32 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/bean/MethodInfo.java | 12 ++++ .../BeanParameterBindingStreamCachingTest.java | 60 ++++++++++++++++++++ .../jsonpath/JsonPathBeanStreamCachingTest.java | 60 ++++++++++++++++++++ 3 files changed, 132 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a6f04063/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java index 5b21409..573ee9e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java @@ -40,6 +40,7 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Pattern; import org.apache.camel.Processor; import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.StreamCache; import org.apache.camel.processor.DynamicRouter; import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.RoutingSlip; @@ -248,6 +249,11 @@ public class MethodInfo { } public boolean proceed(AsyncCallback callback) { + Object body = exchange.getIn().getBody(); + if (body != null && body instanceof StreamCache) { + // ensure the stream cache is reset before calling the method + ((StreamCache) body).reset(); + } try { return doProceed(callback); } catch (InvocationTargetException e) { @@ -467,6 +473,12 @@ public class MethodInfo { exchange.getIn().removeHeader(Exchange.BEAN_METHOD_NAME); for (int i = 0; i < expressions.length; i++) { + + if (body != null && body instanceof StreamCache) { + // need to reset stream cache for each expression as you may access the message body in multiple parameters + ((StreamCache) body).reset(); + } + // grab the parameter value for the given index Object parameterValue = it != null && it.hasNext() ? it.next() : null; // and the expected parameter type http://git-wip-us.apache.org/repos/asf/camel/blob/a6f04063/camel-core/src/test/java/org/apache/camel/component/bean/BeanParameterBindingStreamCachingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/bean/BeanParameterBindingStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/component/bean/BeanParameterBindingStreamCachingTest.java new file mode 100644 index 0000000..ca5bd24 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bean/BeanParameterBindingStreamCachingTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bean; + +import java.io.ByteArrayInputStream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.language.Simple; + +/** + * @version + */ +public class BeanParameterBindingStreamCachingTest extends ContextTestSupport { + + public void testBean() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("abcABC"); + + template.sendBody("direct:start", new ByteArrayInputStream("aBc".getBytes())); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").streamCaching() + .bean(Foo.class) + .to("mock:result"); + } + }; + } + + @SuppressWarnings("unused") + public static class Foo { + + public static String hello(@Simple("${body}") String body, + @Simple("${body}") String body2) { + // we should be able to read the stream multiple times + return body.toLowerCase() + body2.toUpperCase(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a6f04063/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathBeanStreamCachingTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathBeanStreamCachingTest.java b/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathBeanStreamCachingTest.java new file mode 100644 index 0000000..bdfad96 --- /dev/null +++ b/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathBeanStreamCachingTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.jsonpath; + +import java.io.ByteArrayInputStream; + +import com.jayway.jsonpath.Option; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class JsonPathBeanStreamCachingTest extends CamelTestSupport { + + @Test + public void testFullName() throws Exception { + String json = "{\"person\" : {\"firstname\" : \"foo\", \"middlename\" : \"foo2\", \"lastname\" : \"bar\"}}"; + getMockEndpoint("mock:result").expectedBodiesReceived("foo foo2 bar"); + + template.sendBody("direct:start", new ByteArrayInputStream(json.getBytes())); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").streamCaching() + .bean(FullNameBean.class).to("mock:result"); + } + }; + } + + protected static class FullNameBean { + // middle name is optional + public static String getName(@JsonPath("person.firstname") String first, + @JsonPath(value = "person.middlename", options = Option.SUPPRESS_EXCEPTIONS) String middle, + @JsonPath("person.lastname") String last) { + if (middle != null) { + return first + " " + middle + " " + last; + } + return first + " " + last; + } + } +}