Author: davsclaus
Date: Fri Apr 30 09:23:24 2010
New Revision: 939597
URL: http://svn.apache.org/viewvc?rev=939597&view=rev
Log:
CAMEL-2682: Splitter now returns original exchange as outbound by default.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
- copied, changed from r939563,
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java
- copied, changed from r939563,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
Fri Apr 30 09:23:24 2010
@@ -29,8 +29,9 @@ import org.apache.camel.builder.Expressi
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.Splitter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
@@ -103,11 +104,11 @@ public class SplitDefinition extends Exp
private AggregationStrategy createAggregationStrategy(RouteContext
routeContext) {
AggregationStrategy strategy = getAggregationStrategy();
if (strategy == null && strategyRef != null) {
- strategy = routeContext.lookup(strategyRef,
AggregationStrategy.class);
+ strategy =
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), strategyRef,
AggregationStrategy.class);
}
if (strategy == null) {
- // fallback to use latest
- strategy = new UseLatestAggregationStrategy();
+ // fallback to keep the original exchange strategy
+ strategy = new UseOriginalAggregationStrategy(true);
}
return strategy;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Fri Apr 30 09:23:24 2010
@@ -101,7 +101,7 @@ public class MulticastProcessor extends
private final CamelContext camelContext;
private Collection<Processor> processors;
- private final AggregationStrategy aggregationStrategy;
+ private AggregationStrategy aggregationStrategy;
private final boolean parallelProcessing;
private final boolean streaming;
private final boolean stopOnException;
@@ -408,6 +408,10 @@ public class MulticastProcessor extends
return aggregationStrategy;
}
+ public void setAggregationStrategy(AggregationStrategy
aggregationStrategy) {
+ this.aggregationStrategy = aggregationStrategy;
+ }
+
public boolean isParallelProcessing() {
return parallelProcessing;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Fri Apr 30 09:23:24 2010
@@ -29,6 +29,7 @@ import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.util.CollectionHelper;
import org.apache.camel.util.ObjectHelper;
@@ -69,6 +70,26 @@ public class Splitter extends MulticastP
}
@Override
+ public void process(Exchange exchange) throws Exception {
+ AggregationStrategy strategy = getAggregationStrategy();
+
+ // if original aggregation strategy then store exchange
+ // on it as the original exchange
+ UseOriginalAggregationStrategy original = null;
+ if (strategy instanceof UseOriginalAggregationStrategy) {
+ original = (UseOriginalAggregationStrategy) strategy;
+ original.setOriginal(exchange);
+ }
+
+ super.process(exchange);
+
+ if (original != null) {
+ // and remove the reference when we are done (due to thread local
stuff)
+ original.setOriginal(null);
+ }
+ }
+
+ @Override
protected Iterable<ProcessorExchangePair>
createProcessorExchangePairs(Exchange exchange) {
Object value = expression.evaluate(exchange, Object.class);
Copied:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
(from r939563,
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java&r1=939563&r2=939597&rev=939597&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
Fri Apr 30 09:23:24 2010
@@ -19,22 +19,50 @@ package org.apache.camel.processor.aggre
import org.apache.camel.Exchange;
/**
- * An {...@link AggregationStrategy} which just uses the latest exchange which
is useful
- * for status messages where old status messages have no real value. Another
example is things
- * like market data prices, where old stock prices are not that relevant, only
the current price is.
+ * An {...@link org.apache.camel.processor.aggregate.AggregationStrategy}
which just uses the original exchange
+ * which can be needed when you want to preserve the original Exchange. For
example when splitting an Exchange
+ * and then you may want to keep routing using the original Exchange.
+ * <p/>
+ * You must call {...@link #setOriginal(org.apache.camel.Exchange)} before
this aggregation strategy can be used,
+ * as it needs to have a reference to the original exchange.
*
+ * @see org.apache.camel.processor.Splitter
* @version $Revision$
*/
-public class UseLatestAggregationStrategy implements AggregationStrategy {
+public class UseOriginalAggregationStrategy implements AggregationStrategy {
+
+ // must use a thread local to cater for concurrency
+ private final ThreadLocal<Exchange> original = new ThreadLocal<Exchange>();
+ private final boolean propagateException;
+
+ public UseOriginalAggregationStrategy(boolean propagateException) {
+ this.propagateException = propagateException;
+ }
+
+ public void setOriginal(Exchange exchange) {
+ if (exchange == null) {
+ // clear it
+ this.original.remove();
+ } else {
+ this.original.set(exchange);
+ }
+ }
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- if (newExchange == null) {
- return oldExchange;
+ Exchange answer = original.get();
+ if (answer == null) {
+ throw new IllegalStateException("Original Exchange has not been
set");
}
- newExchange.setException(checkException(oldExchange, newExchange));
- return newExchange;
+
+ if (propagateException) {
+ Exception exception = checkException(oldExchange, newExchange);
+ if (exception != null) {
+ answer.setException(exception);
+ }
+ }
+ return answer;
}
-
+
protected Exception checkException(Exchange oldExchange, Exchange
newExchange) {
if (oldExchange == null) {
return newExchange.getException();
@@ -47,6 +75,6 @@ public class UseLatestAggregationStrateg
@Override
public String toString() {
- return "UseLatestAggregationStrategy";
+ return "UseOriginalAggregationStrategy";
}
-}
+}
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
Fri Apr 30 09:23:24 2010
@@ -39,6 +39,8 @@ public class ShutdownSedaAndDirectEndpoi
context.stop();
+ assertMockEndpointsSatisfied();
+
assertEquals("Should complete all messages", 5,
bar.getReceivedCounter());
}
@@ -52,7 +54,7 @@ public class ShutdownSedaAndDirectEndpoi
.to("direct:bar");
from("direct:bar")
- .delay(1000)
+ .delay(250)
.to("mock:bar");
}
};
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java
(from r939563,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java&r1=939563&r2=939597&rev=939597&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNoAggregationStrategyTest.java
Fri Apr 30 09:23:24 2010
@@ -27,29 +27,32 @@ import org.apache.camel.component.mock.M
/**
* @version $Revision$
*/
-public class SplitterStopOnExceptionTest extends ContextTestSupport {
+public class SplitterNoAggregationStrategyTest extends ContextTestSupport {
- public void testSplitStopOnExceptionOk() throws Exception {
+ public void testSplitNoAggregationStrategy() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("Hello World", "Bye World");
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello World,Bye World");
+
template.sendBody("direct:start", "Hello World,Bye World");
assertMockEndpointsSatisfied();
}
- public void testSplitStopOnExceptionStop() throws Exception {
+ public void testSplitNoAggregationStrategyException() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
- // we do stop so we stop splitting when the exception occurs and thus
we only receive 1 message
- mock.expectedBodiesReceived("Hello World");
+ mock.expectedBodiesReceived("Hello World", "Bye World");
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(0);
try {
template.sendBody("direct:start", "Hello World,Kaboom,Bye World");
fail("Should thrown an exception");
} catch (CamelExecutionException e) {
- CamelExchangeException cause =
assertIsInstanceOf(CamelExchangeException.class, e.getCause());
- assertTrue(cause.getMessage().startsWith("Sequential processing
failed for number 1. Exchange[Message: Kaboom]"));
- assertEquals("Forced", cause.getCause().getMessage());
+ assertEquals("Forced", e.getCause().getMessage());
}
assertMockEndpointsSatisfied();
@@ -61,9 +64,11 @@ public class SplitterStopOnExceptionTest
@Override
public void configure() throws Exception {
from("direct:start")
- .split(body().tokenize(",")).stopOnException()
+ .split(body().tokenize(","))
.process(new MyProcessor())
- .to("mock:split");
+ .to("mock:split")
+ .end()
+ .to("mock:result");
}
};
}
@@ -77,4 +82,4 @@ public class SplitterStopOnExceptionTest
}
}
}
-}
+}
\ No newline at end of file
Modified:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java?rev=939597&r1=939596&r2=939597&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
(original)
+++
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/ProduceSplitMethodCallIssueTest.java
Fri Apr 30 09:23:24 2010
@@ -35,7 +35,8 @@ public class ProduceSplitMethodCallIssue
CoolService cool = context.getRegistry().lookup("cool",
CoolService.class);
String out = cool.stuff("A,B");
- assertEquals("Hello B", out);
+ // keeps the original message
+ assertEquals("A,B", out);
assertMockEndpointsSatisfied();
}