Author: ningjiang Date: Fri Dec 2 10:56:36 2011 New Revision: 1209401 URL: http://svn.apache.org/viewvc?rev=1209401&view=rev Log: CAMEL-4724 reset the camel-context of the exchange on the seda producer
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1209401&r1=1209400&r2=1209401&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Dec 2 10:56:36 2011 @@ -37,6 +37,7 @@ import org.apache.camel.spi.ShutdownAwar import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,10 +154,20 @@ public class SedaConsumer extends Servic exchange = queue.poll(1000, TimeUnit.MILLISECONDS); if (exchange != null) { try { - sendToConsumers(exchange); - + // send a new copied exchange with new camel context + Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext()); + // set the fromEndpoint + newExchange.setFromEndpoint(endpoint); + sendToConsumers(newExchange); + // copy the message back + if (newExchange.hasOut()) { + exchange.setOut(newExchange.getOut().copy()); + } else { + exchange.setIn(newExchange.getIn()); + } // log exception if an exception occurred and was not handled - if (exchange.getException() != null) { + if (newExchange.getException() != null) { + exchange.setException(newExchange.getException()); getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } } catch (Exception e) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=1209401&r1=1209400&r2=1209401&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Fri Dec 2 10:56:36 2011 @@ -18,6 +18,7 @@ package org.apache.camel.util; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import org.apache.camel.NoSuchHeaderExce import org.apache.camel.NoSuchPropertyException; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConverter; +import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.UnitOfWork; /** @@ -651,4 +653,27 @@ public final class ExchangeHelper { exchange.setOut(null); } } + + public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { + DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); + if (exchange.hasProperties()) { + answer.setProperties(safeCopy(exchange.getProperties())); + } + // Need to hand over the completion for async invocation + exchange.handoverCompletions(answer); + answer.setIn(exchange.getIn().copy()); + if (exchange.hasOut()) { + answer.setOut(exchange.getOut().copy()); + } + answer.setException(exchange.getException()); + return answer; + + } + + private static Map<String, Object> safeCopy(Map<String, Object> properties) { + if (properties == null) { + return null; + } + return new ConcurrentHashMap<String, Object>(properties); + } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java?rev=1209401&r1=1209400&r2=1209401&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java Fri Dec 2 10:56:36 2011 @@ -39,6 +39,7 @@ public abstract class AbstractVmTestSupp super.setUp(); context2 = new DefaultCamelContext(); + RouteBuilder routeBuilder = createRouteBuilderForSecondContext(); if (routeBuilder != null) { context2.addRoutes(routeBuilder); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java?rev=1209401&r1=1209400&r2=1209401&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java Fri Dec 2 10:56:36 2011 @@ -17,6 +17,7 @@ package org.apache.camel.component.vm; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; /** @@ -26,11 +27,16 @@ public class VmDifferentOptionsOnConsume @Test public void testSendToVm() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Hello World"); + template2.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); + + // check the camel context of the exchange + assertEquals("Get a wrong context. ", context, result.getExchanges().get(0).getContext()); } @Override