This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 302128a CAMEL-14972: Enricher does not reset stream cache for aggregator (#3780) 302128a is described below commit 302128a9f7bd018dc639721b2dde4b94a5d04b52 Author: forsthofer <forstho...@users.noreply.github.com> AuthorDate: Tue Apr 28 07:19:02 2020 +0200 CAMEL-14972: Enricher does not reset stream cache for aggregator (#3780) Co-authored-by: Franz Forsthofer <franz.forstho...@sap.com> --- .../java/org/apache/camel/processor/Enricher.java | 3 + .../enricher/EnricherAggregateStreamingTest.java | 126 +++++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java index 186ff1c..dcd87bb 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java @@ -42,6 +42,7 @@ import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.MessageHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.StopWatch; import org.slf4j.Logger; @@ -237,6 +238,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA try { // prepare the exchanges for aggregation ExchangeHelper.prepareAggregation(exchange, resourceExchange); + MessageHelper.resetStreamCache(exchange.getIn()); Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); if (aggregatedExchange != null) { @@ -298,6 +300,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA try { // prepare the exchanges for aggregation ExchangeHelper.prepareAggregation(exchange, resourceExchange); + MessageHelper.resetStreamCache(exchange.getIn()); Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); if (aggregatedExchange != null) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateStreamingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateStreamingTest.java new file mode 100644 index 0000000..a39674b --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateStreamingTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.spi.Registry; +import org.apache.camel.spi.StreamCachingStrategy; +import org.junit.Test; + +/** + * The original message body is a StreamCache. Therefore the stream cache must + * be reset before the aggregator is called. + */ +public class EnricherAggregateStreamingTest extends ContextTestSupport { + + @Test + public void testStream() throws Exception { + + getMockEndpoint("mock:result").expectedBodiesReceived("Old Body New Body"); + + template.sendBody("direct:start", ""); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testStreamSync() throws Exception { + + getMockEndpoint("mock:result").expectedBodiesReceived("Old Body New Body"); + + template.sendBody("direct:startSync", ""); + + assertMockEndpointsSatisfied(); + } + + @Override + protected Registry createRegistry() throws Exception { + Registry jndi = super.createRegistry(); + jndi.bind("b1", new MyProcessor()); // for synchronous call + return jndi; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + getContext().setStreamCaching(Boolean.TRUE); + StreamCachingStrategy scs = getContext().getStreamCachingStrategy(); + scs.setSpoolThreshold(1l); + scs.setSpoolDirectory("target/streamcache/"); + from("direct:start").process(new StreamProcessor()).enrich("direct:foo", new MyAggregationStrategy(), false) + .to("mock:result"); + + from("direct:foo").bean(new MyProcessor()); + + from("direct:startSync").process(new StreamProcessor()).enrich().simple("bean:b1") + .aggregationStrategy(new MyAggregationStrategy()).to("mock:result"); + + } + }; + } + + public static class StreamProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + // consume stream by reading the body as string + try (CachedOutputStream os = new CachedOutputStream(exchange)) { + + os.write("Old Body ".getBytes(StandardCharsets.UTF_8)); + + InputStream is = os.getInputStream(); + exchange.getIn().setBody(is); + } + } + } + + public static class MyProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + // consume stream by reading the body as string + exchange.getIn().getBody(String.class); + exchange.getIn().setBody("New Body"); + } + } + + public static class MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // consume stream by reading the body as string + String oldbody = oldExchange.getIn().getBody(String.class); + + String newbody = newExchange.getIn().getBody(String.class); + + // replace body + oldExchange.getIn().setBody(oldbody + newbody); + return oldExchange; + } + } + +}