CAMEL-7787: Multicast - Should defer UoW done until after the aggregate has 
been done. Thanks to Franz Forsthofer for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ae44a18
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ae44a18
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ae44a18

Branch: refs/heads/camel-2.14.x
Commit: 0ae44a185ca64ab1d4143fba94c7d8d9e6c98499
Parents: 5c67207
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Dec 21 14:18:05 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Dec 21 16:10:43 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/Exchange.java    |   1 +
 .../converter/stream/CachedOutputStream.java    |  21 +++-
 .../camel/processor/MulticastProcessor.java     |   9 ++
 .../org/apache/camel/processor/Splitter.java    |  10 ++
 .../MultiCastStreamCachingInSubRouteTest.java   | 120 ++++++++++++++++++
 .../SplitterStreamCachingInSubRouteTest.java    | 126 +++++++++++++++++++
 6 files changed, 284 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java 
b/camel-core/src/main/java/org/apache/camel/Exchange.java
index a33253a..ab12c89 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -167,6 +167,7 @@ public interface Exchange {
     String OVERRULE_FILE_NAME = "CamelOverruleFileName";
 
     String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork";
+    String STREAM_CACHE_UNIT_OF_WORK = "CamelStreamCacheUnitOfWork";
     
     String RECIPIENT_LIST_ENDPOINT = "CamelRecipientListEndpoint";
     String RECEIVED_TIMESTAMP      = "CamelReceivedTimestamp";

http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 63cedc3..616b2ed 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -25,11 +25,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.GeneralSecurityException;
+
 import javax.crypto.CipherOutputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
@@ -78,7 +81,7 @@ public class CachedOutputStream extends OutputStream {
         currentStream = new 
CachedByteArrayOutputStream(strategy.getBufferSize());
         if (closedOnCompletion) {
             // add on completion so we can cleanup after the exchange is done 
such as deleting temporary files
-            exchange.addOnCompletion(new SynchronizationAdapter() {
+            Synchronization onCompletion = new SynchronizationAdapter() {
                 @Override
                 public void onDone(Exchange exchange) {
                     try {
@@ -95,12 +98,24 @@ public class CachedOutputStream extends OutputStream {
                         LOG.warn("Error closing streams. This exception will 
be ignored.", e);
                     }
                 }
-        
+
                 @Override
                 public String toString() {
                     return "OnCompletion[CachedOutputStream]";
                 }
-            });
+            };
+
+            UnitOfWork streamCacheUnitOfWork = 
exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
+            if (streamCacheUnitOfWork != null) {
+                // The stream cache must sometimes not be closed when the 
exchange is deleted. This is for example the
+                // case in the splitter and multi-cast case with 
AggregationStrategy where the result of the sub-routes
+                // are aggregated later in the main route. Here, the cached 
streams of the sub-routes must be closed with
+                // the Unit of Work of the main route.
+                streamCacheUnitOfWork.addSynchronization(onCompletion);
+            } else {
+                // add on completion so we can cleanup after the exchange is 
done such as deleting temporary files
+                exchange.addOnCompletion(onCompletion);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 38e70bb..6c1a54c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -864,6 +864,15 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
             // copy exchange, and do not share the unit of work
             Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false);
 
+            // If the multi-cast processor has an aggregation strategy
+            // then the StreamCache created by the child routes must not be 
+            // closed by the unit of work of the child route, but by the unit 
of 
+            // work of the parent route or grand parent route or grand grand 
parent route ...(in case of nesting).
+            // Set therefore the unit of work of the  parent route as stream 
cache unit of work, 
+            // if it is not already set.
+            if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+                copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, 
exchange.getUnitOfWork());
+            }
             // if we share unit of work, we need to prepare the child exchange
             if (isShareUnitOfWork()) {
                 prepareSharedUnitOfWork(copy, exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 314de20..ec9b258 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -139,8 +139,10 @@ public class Splitter extends MulticastProcessor 
implements AsyncProcessor, Trac
         final Iterator<?> iterator;
         private final Exchange copy;
         private final RouteContext routeContext;
+        private final Exchange original;
 
         private SplitterIterable(Exchange exchange, Object value) {
+            this.original = exchange;
             this.value = value;
             this.iterator = ObjectHelper.createIterator(value);
             this.copy = copyExchangeNoAttachments(exchange, true);
@@ -177,6 +179,14 @@ public class Splitter extends MulticastProcessor 
implements AsyncProcessor, Trac
                     // create a correlated copy as the new exchange to be 
routed in the splitter from the copy
                     // and do not share the unit of work
                     Exchange newExchange = 
ExchangeHelper.createCorrelatedCopy(copy, false);
+                    // If the splitter has an aggregation strategy
+                    // then the StreamCache created by the child routes must 
not be 
+                    // closed by the unit of work of the child route, but by 
the unit of 
+                    // work of the parent route or grand parent route or grand 
grand parent route... (in case of nesting).
+                    // Therefore, set the unit of work of the parent route as 
stream cache unit of work, if not already set.
+                    if 
(newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+                        
newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, 
original.getUnitOfWork());
+                    }
                     // if we share unit of work, we need to prepare the child 
exchange
                     if (isShareUnitOfWork()) {
                         prepareSharedUnitOfWork(newExchange, copy);

http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
new file mode 100644
index 0000000..aa8da23
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+public class MultiCastStreamCachingInSubRouteTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                context.getStreamCachingStrategy().setEnabled(true);
+                
context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
+                context.getStreamCachingStrategy().setSpoolThreshold(1L);
+
+                from("direct:start").multicast(new 
InternalAggregationStrategy()).to("direct:a", 
"direct:b").end().to("mock:result");
+
+                from("direct:startNestedMultiCast").multicast(new 
InternalAggregationStrategy()).to("direct:start").end()
+                        .to("mock:resultNested");
+
+                from("direct:a") //
+                        .process(new InputProcessorWithStreamCache(1)) //
+                        .to("mock:resulta");
+
+                from("direct:b") //
+                        .process(new InputProcessorWithStreamCache(2)) //
+                        .to("mock:resultb");
+            }
+        };
+    }
+
+    public void testWithAggregationStrategyAndStreamCacheInSubRoute() throws 
Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Test Message 1Test Message 2");
+        template.sendBody("direct:start", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testNestedMultiCastWithCachedStreamInAggregationStrategy() 
throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultNested");
+        mock.expectedBodiesReceived("Test Message 1Test Message 2");
+        template.sendBody("direct:startNestedMultiCast", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public static class InputProcessorWithStreamCache implements Processor {
+
+        private final int number;
+
+        public InputProcessorWithStreamCache(int number) {
+            this.number = number;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+
+            CachedOutputStream cos = new CachedOutputStream(exchange);
+            String s = "Test Message " + number;
+            cos.write(s.getBytes(Charset.forName("UTF-8")));
+            cos.close();
+            InputStream is = (InputStream) cos.newStreamCache();
+            exchange.getOut().setBody(is);
+
+        }
+    }
+
+    public static class InternalAggregationStrategy implements 
AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            try {
+                String oldBody = oldExchange.getIn().getBody(String.class);
+                String newBody = newExchange.getIn().getBody(String.class);
+                String merged = oldBody + newBody;
+                //also do stream caching in the aggregation strategy           
 
+                CachedOutputStream cos = new CachedOutputStream(newExchange);
+                cos.write(merged.getBytes("UTF-8"));
+                cos.close();
+                oldExchange.getIn().setBody(cos.newStreamCache());
+                return oldExchange;
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0ae44a18/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
new file mode 100644
index 0000000..84d7700
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+public class SplitterStreamCachingInSubRouteTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                context.getStreamCachingStrategy().setEnabled(true);
+                
context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
+                context.getStreamCachingStrategy().setSpoolThreshold(1L);
+
+                
from("direct:startIterable").split(body().tokenize(",")).streaming().aggregationStrategy(new
 InternalAggregationStrategy())
+                        
.stopOnException().parallelProcessing().to("direct:sub").end().to("mock:result");
+
+                
from("direct:start").split(body().tokenize(",")).aggregationStrategy(new 
InternalAggregationStrategy()).stopOnException()
+                        
.parallelProcessing().to("direct:sub").end().to("mock:result");
+
+                from("direct:sub").process(new 
InputProcessorWithStreamCache(22)).to("mock:resultsub");
+
+                
from("direct:startNested").split(body().tokenize(",")).aggregationStrategy(new 
InternalAggregationStrategy())
+                        
.stopOnException().parallelProcessing().to("direct:start").end().to("mock:resultNested");
+            }
+
+        };
+    }
+
+    public void testWithAggregationStategyAndStreamCacheInSubRoute() throws 
Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Test Message 22");
+        template.sendBody("direct:start", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testStreamCacheIterableSplitter() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Test Message 22");
+        template.sendBody("direct:startIterable", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testNested() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultNested");
+        mock.expectedBodiesReceived("Test Message 22");
+        template.sendBody("direct:startNested", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public static class InputProcessorWithStreamCache implements Processor {
+
+        private final int number;
+
+        public InputProcessorWithStreamCache(int number) {
+            this.number = number;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+
+            CachedOutputStream cos = new CachedOutputStream(exchange);
+            String s = "Test Message " + number;
+            cos.write(s.getBytes(Charset.forName("UTF-8")));
+            cos.close();
+            InputStream is = (InputStream) cos.newStreamCache();
+
+            exchange.getOut().setBody(is);
+        }
+    }
+
+    public static class InternalAggregationStrategy implements 
AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            try {
+                String oldBody = oldExchange.getIn().getBody(String.class);
+                String newBody = newExchange.getIn().getBody(String.class);
+                String merged = oldBody + newBody;
+                //also do stream caching in the aggregation strategy           
 
+                CachedOutputStream cos = new CachedOutputStream(newExchange);
+                cos.write(merged.getBytes("UTF-8"));
+                cos.close();
+                oldExchange.getIn().setBody(cos.newStreamCache());
+                return oldExchange;
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+}

Reply via email to