CAMEL-7787: Multicast - Should defer UoW done until after the aggregate has been done. Thanks to Franz Forsthofer for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ae44a18 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ae44a18 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ae44a18 Branch: refs/heads/camel-2.14.x Commit: 0ae44a185ca64ab1d4143fba94c7d8d9e6c98499 Parents: 5c67207 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Dec 21 14:18:05 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Dec 21 16:10:43 2014 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/camel/Exchange.java | 1 + .../converter/stream/CachedOutputStream.java | 21 +++- .../camel/processor/MulticastProcessor.java | 9 ++ .../org/apache/camel/processor/Splitter.java | 10 ++ .../MultiCastStreamCachingInSubRouteTest.java | 120 ++++++++++++++++++ .../SplitterStreamCachingInSubRouteTest.java | 126 +++++++++++++++++++ 6 files changed, 284 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/Exchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java index a33253a..ab12c89 100644 --- a/camel-core/src/main/java/org/apache/camel/Exchange.java +++ b/camel-core/src/main/java/org/apache/camel/Exchange.java @@ -167,6 +167,7 @@ public interface Exchange { String OVERRULE_FILE_NAME = "CamelOverruleFileName"; String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork"; + String STREAM_CACHE_UNIT_OF_WORK = "CamelStreamCacheUnitOfWork"; String RECIPIENT_LIST_ENDPOINT = "CamelRecipientListEndpoint"; String RECEIVED_TIMESTAMP = "CamelReceivedTimestamp"; http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java index 63cedc3..616b2ed 100644 --- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java +++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java @@ -25,11 +25,14 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.security.GeneralSecurityException; + import javax.crypto.CipherOutputStream; import org.apache.camel.Exchange; import org.apache.camel.StreamCache; import org.apache.camel.spi.StreamCachingStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; @@ -78,7 +81,7 @@ public class CachedOutputStream extends OutputStream { currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize()); if (closedOnCompletion) { // add on completion so we can cleanup after the exchange is done such as deleting temporary files - exchange.addOnCompletion(new SynchronizationAdapter() { + Synchronization onCompletion = new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { try { @@ -95,12 +98,24 @@ public class CachedOutputStream extends OutputStream { LOG.warn("Error closing streams. This exception will be ignored.", e); } } - + @Override public String toString() { return "OnCompletion[CachedOutputStream]"; } - }); + }; + + UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class); + if (streamCacheUnitOfWork != null) { + // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the + // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes + // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with + // the Unit of Work of the main route. + streamCacheUnitOfWork.addSynchronization(onCompletion); + } else { + // add on completion so we can cleanup after the exchange is done such as deleting temporary files + exchange.addOnCompletion(onCompletion); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 38e70bb..6c1a54c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -864,6 +864,15 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // copy exchange, and do not share the unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + // If the multi-cast processor has an aggregation strategy + // then the StreamCache created by the child routes must not be + // closed by the unit of work of the child route, but by the unit of + // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting). + // Set therefore the unit of work of the parent route as stream cache unit of work, + // if it is not already set. + if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { + copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork()); + } // if we share unit of work, we need to prepare the child exchange if (isShareUnitOfWork()) { prepareSharedUnitOfWork(copy, exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/processor/Splitter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index 314de20..ec9b258 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -139,8 +139,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac final Iterator<?> iterator; private final Exchange copy; private final RouteContext routeContext; + private final Exchange original; private SplitterIterable(Exchange exchange, Object value) { + this.original = exchange; this.value = value; this.iterator = ObjectHelper.createIterator(value); this.copy = copyExchangeNoAttachments(exchange, true); @@ -177,6 +179,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac // create a correlated copy as the new exchange to be routed in the splitter from the copy // and do not share the unit of work Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); + // If the splitter has an aggregation strategy + // then the StreamCache created by the child routes must not be + // closed by the unit of work of the child route, but by the unit of + // work of the parent route or grand parent route or grand grand parent route... (in case of nesting). + // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set. + if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { + newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork()); + } // if we share unit of work, we need to prepare the child exchange if (isShareUnitOfWork()) { prepareSharedUnitOfWork(newExchange, copy); http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java new file mode 100644 index 0000000..aa8da23 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +public class MultiCastStreamCachingInSubRouteTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setStreamCaching(true); + context.getStreamCachingStrategy().setEnabled(true); + context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache"); + context.getStreamCachingStrategy().setSpoolThreshold(1L); + + from("direct:start").multicast(new InternalAggregationStrategy()).to("direct:a", "direct:b").end().to("mock:result"); + + from("direct:startNestedMultiCast").multicast(new InternalAggregationStrategy()).to("direct:start").end() + .to("mock:resultNested"); + + from("direct:a") // + .process(new InputProcessorWithStreamCache(1)) // + .to("mock:resulta"); + + from("direct:b") // + .process(new InputProcessorWithStreamCache(2)) // + .to("mock:resultb"); + } + }; + } + + public void testWithAggregationStrategyAndStreamCacheInSubRoute() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Test Message 1Test Message 2"); + template.sendBody("direct:start", "<start></start>"); + + assertMockEndpointsSatisfied(); + } + + public void testNestedMultiCastWithCachedStreamInAggregationStrategy() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resultNested"); + mock.expectedBodiesReceived("Test Message 1Test Message 2"); + template.sendBody("direct:startNestedMultiCast", "<start></start>"); + + assertMockEndpointsSatisfied(); + } + + public static class InputProcessorWithStreamCache implements Processor { + + private final int number; + + public InputProcessorWithStreamCache(int number) { + this.number = number; + } + + @Override + public void process(Exchange exchange) throws Exception { + + CachedOutputStream cos = new CachedOutputStream(exchange); + String s = "Test Message " + number; + cos.write(s.getBytes(Charset.forName("UTF-8"))); + cos.close(); + InputStream is = (InputStream) cos.newStreamCache(); + exchange.getOut().setBody(is); + + } + } + + public static class InternalAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + try { + String oldBody = oldExchange.getIn().getBody(String.class); + String newBody = newExchange.getIn().getBody(String.class); + String merged = oldBody + newBody; + //also do stream caching in the aggregation strategy + CachedOutputStream cos = new CachedOutputStream(newExchange); + cos.write(merged.getBytes("UTF-8")); + cos.close(); + oldExchange.getIn().setBody(cos.newStreamCache()); + return oldExchange; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java new file mode 100644 index 0000000..84d7700 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +public class SplitterStreamCachingInSubRouteTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setStreamCaching(true); + context.getStreamCachingStrategy().setEnabled(true); + context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache"); + context.getStreamCachingStrategy().setSpoolThreshold(1L); + + from("direct:startIterable").split(body().tokenize(",")).streaming().aggregationStrategy(new InternalAggregationStrategy()) + .stopOnException().parallelProcessing().to("direct:sub").end().to("mock:result"); + + from("direct:start").split(body().tokenize(",")).aggregationStrategy(new InternalAggregationStrategy()).stopOnException() + .parallelProcessing().to("direct:sub").end().to("mock:result"); + + from("direct:sub").process(new InputProcessorWithStreamCache(22)).to("mock:resultsub"); + + from("direct:startNested").split(body().tokenize(",")).aggregationStrategy(new InternalAggregationStrategy()) + .stopOnException().parallelProcessing().to("direct:start").end().to("mock:resultNested"); + } + + }; + } + + public void testWithAggregationStategyAndStreamCacheInSubRoute() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Test Message 22"); + template.sendBody("direct:start", "<start></start>"); + + assertMockEndpointsSatisfied(); + } + + public void testStreamCacheIterableSplitter() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Test Message 22"); + template.sendBody("direct:startIterable", "<start></start>"); + + assertMockEndpointsSatisfied(); + } + + public void testNested() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resultNested"); + mock.expectedBodiesReceived("Test Message 22"); + template.sendBody("direct:startNested", "<start></start>"); + + assertMockEndpointsSatisfied(); + } + + public static class InputProcessorWithStreamCache implements Processor { + + private final int number; + + public InputProcessorWithStreamCache(int number) { + this.number = number; + } + + @Override + public void process(Exchange exchange) throws Exception { + + CachedOutputStream cos = new CachedOutputStream(exchange); + String s = "Test Message " + number; + cos.write(s.getBytes(Charset.forName("UTF-8"))); + cos.close(); + InputStream is = (InputStream) cos.newStreamCache(); + + exchange.getOut().setBody(is); + } + } + + public static class InternalAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + try { + String oldBody = oldExchange.getIn().getBody(String.class); + String newBody = newExchange.getIn().getBody(String.class); + String merged = oldBody + newBody; + //also do stream caching in the aggregation strategy + CachedOutputStream cos = new CachedOutputStream(newExchange); + cos.write(merged.getBytes("UTF-8")); + cos.close(); + oldExchange.getIn().setBody(cos.newStreamCache()); + return oldExchange; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + +}