Author: davsclaus
Date: Fri Apr 30 09:23:24 2010
New Revision: 939597

URL: http://svn.apache.org/viewvc?rev=939597&view=rev
Log:
CAMEL-2682: Splitter now returns original exchange as outbound by default.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
      - copied, changed from r939563, 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java
      - copied, changed from r939563, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
 Fri Apr 30 09:23:24 2010
@@ -29,8 +29,9 @@ import org.apache.camel.builder.Expressi
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.Splitter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
@@ -103,11 +104,11 @@ public class SplitDefinition extends Exp
     private AggregationStrategy createAggregationStrategy(RouteContext 
routeContext) {
         AggregationStrategy strategy = getAggregationStrategy();
         if (strategy == null && strategyRef != null) {
-            strategy = routeContext.lookup(strategyRef, 
AggregationStrategy.class);
+            strategy = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), strategyRef, 
AggregationStrategy.class);
         }
         if (strategy == null) {
-            // fallback to use latest
-            strategy = new UseLatestAggregationStrategy();
+            // fallback to keep the original exchange strategy
+            strategy = new UseOriginalAggregationStrategy(true);
         }
         return strategy;
     }        

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Fri Apr 30 09:23:24 2010
@@ -101,7 +101,7 @@ public class MulticastProcessor extends 
 
     private final CamelContext camelContext;
     private Collection<Processor> processors;
-    private final AggregationStrategy aggregationStrategy;
+    private AggregationStrategy aggregationStrategy;
     private final boolean parallelProcessing;
     private final boolean streaming;
     private final boolean stopOnException;
@@ -408,6 +408,10 @@ public class MulticastProcessor extends 
         return aggregationStrategy;
     }
 
+    public void setAggregationStrategy(AggregationStrategy 
aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
     public boolean isParallelProcessing() {
         return parallelProcessing;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
Fri Apr 30 09:23:24 2010
@@ -29,6 +29,7 @@ import org.apache.camel.Expression;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.util.CollectionHelper;
 import org.apache.camel.util.ObjectHelper;
 
@@ -69,6 +70,26 @@ public class Splitter extends MulticastP
     }
 
     @Override
+    public void process(Exchange exchange) throws Exception {
+        AggregationStrategy strategy = getAggregationStrategy();
+
+        // if original aggregation strategy then store exchange
+        // on it as the original exchange
+        UseOriginalAggregationStrategy original = null;
+        if (strategy instanceof UseOriginalAggregationStrategy) {
+            original = (UseOriginalAggregationStrategy) strategy;
+            original.setOriginal(exchange);
+        }
+
+        super.process(exchange);
+
+        if (original != null) {
+            // and remove the reference when we are done (due to thread local 
stuff)
+            original.setOriginal(null);
+        }
+    }
+
+    @Override
     protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange) {
         Object value = expression.evaluate(exchange, Object.class);
 

Copied: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
 (from r939563, 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java&r1=939563&r2=939597&rev=939597&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
 Fri Apr 30 09:23:24 2010
@@ -19,22 +19,50 @@ package org.apache.camel.processor.aggre
 import org.apache.camel.Exchange;
 
 /**
- * An {...@link AggregationStrategy} which just uses the latest exchange which 
is useful
- * for status messages where old status messages have no real value. Another 
example is things
- * like market data prices, where old stock prices are not that relevant, only 
the current price is.
+ * An {...@link org.apache.camel.processor.aggregate.AggregationStrategy} 
which just uses the original exchange
+ * which can be needed when you want to preserve the original Exchange. For 
example when splitting an Exchange
+ * and then you may want to keep routing using the original Exchange.
+ * <p/>
+ * You must call {...@link #setOriginal(org.apache.camel.Exchange)} before 
this aggregation strategy can be used,
+ * as it needs to have a reference to the original exchange.
  *
+ * @see org.apache.camel.processor.Splitter
  * @version $Revision$
  */
-public class UseLatestAggregationStrategy implements AggregationStrategy {
+public class UseOriginalAggregationStrategy implements AggregationStrategy {
+
+    // must use a thread local to cater for concurrency
+    private final ThreadLocal<Exchange> original = new ThreadLocal<Exchange>();
+    private final boolean propagateException;
+
+    public UseOriginalAggregationStrategy(boolean propagateException) {
+        this.propagateException = propagateException;
+    }
+
+    public void setOriginal(Exchange exchange) {
+        if (exchange == null) {
+            // clear it
+            this.original.remove();
+        } else {
+            this.original.set(exchange);
+        }
+    }
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        if (newExchange == null) {
-            return oldExchange;
+        Exchange answer = original.get();
+        if (answer == null) {
+            throw new IllegalStateException("Original Exchange has not been 
set");
         }
-        newExchange.setException(checkException(oldExchange, newExchange));
-        return newExchange;
+
+        if (propagateException) {
+            Exception exception = checkException(oldExchange, newExchange);
+            if (exception != null) {
+                answer.setException(exception);
+            }
+        }
+        return answer;
     }
-    
+
     protected Exception checkException(Exchange oldExchange, Exchange 
newExchange) {
         if (oldExchange == null) {
             return newExchange.getException();
@@ -47,6 +75,6 @@ public class UseLatestAggregationStrateg
 
     @Override
     public String toString() {
-        return "UseLatestAggregationStrategy";
+        return "UseOriginalAggregationStrategy";
     }
-}
+}
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
 Fri Apr 30 09:23:24 2010
@@ -39,6 +39,8 @@ public class ShutdownSedaAndDirectEndpoi
 
         context.stop();
 
+        assertMockEndpointsSatisfied();
+
         assertEquals("Should complete all messages", 5, 
bar.getReceivedCounter());
     }
 
@@ -52,7 +54,7 @@ public class ShutdownSedaAndDirectEndpoi
                     .to("direct:bar");
 
                 from("direct:bar")
-                    .delay(1000)
+                    .delay(250)
                     .to("mock:bar");
             }
         };

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java
 (from r939563, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java&r1=939563&r2=939597&rev=939597&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java
 Fri Apr 30 09:23:24 2010
@@ -27,29 +27,32 @@ import org.apache.camel.component.mock.M
 /**
  * @version $Revision$
  */
-public class SplitterStopOnExceptionTest extends ContextTestSupport {
+public class SplitterNoAggregationStrategyTest extends ContextTestSupport {
 
-    public void testSplitStopOnExceptionOk() throws Exception {
+    public void testSplitNoAggregationStrategy() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:split");
         mock.expectedBodiesReceived("Hello World", "Bye World");
 
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Hello World,Bye World");
+
         template.sendBody("direct:start", "Hello World,Bye World");
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testSplitStopOnExceptionStop() throws Exception {
+    public void testSplitNoAggregationStrategyException() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:split");
-        // we do stop so we stop splitting when the exception occurs and thus 
we only receive 1 message
-        mock.expectedBodiesReceived("Hello World");
+        mock.expectedBodiesReceived("Hello World", "Bye World");
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(0);
 
         try {
             template.sendBody("direct:start", "Hello World,Kaboom,Bye World");
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
-            CamelExchangeException cause = 
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(cause.getMessage().startsWith("Sequential processing 
failed for number 1. Exchange[Message: Kaboom]"));
-            assertEquals("Forced", cause.getCause().getMessage());
+            assertEquals("Forced", e.getCause().getMessage());
         }
 
         assertMockEndpointsSatisfied();
@@ -61,9 +64,11 @@ public class SplitterStopOnExceptionTest
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .split(body().tokenize(",")).stopOnException()
+                    .split(body().tokenize(","))
                         .process(new MyProcessor())
-                        .to("mock:split");
+                        .to("mock:split")
+                    .end()
+                    .to("mock:result");
             }
         };
     }
@@ -77,4 +82,4 @@ public class SplitterStopOnExceptionTest
             }
         }
     }
-}
+}
\ No newline at end of file

Modified: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
 Fri Apr 30 09:23:24 2010
@@ -35,7 +35,8 @@ public class ProduceSplitMethodCallIssue
 
         CoolService cool = context.getRegistry().lookup("cool", 
CoolService.class);
         String out = cool.stuff("A,B");
-        assertEquals("Hello B", out);
+        // keeps the original message
+        assertEquals("A,B", out);
 
         assertMockEndpointsSatisfied();
     }


Reply via email to