Author: ningjiang
Date: Fri Dec  2 10:56:36 2011
New Revision: 1209401

URL: http://svn.apache.org/viewvc?rev=1209401&view=rev
Log:
CAMEL-4724 reset the camel-context of the exchange on the seda producer

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1209401&r1=1209400&r2=1209401&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 Fri Dec  2 10:56:36 2011
@@ -37,6 +37,7 @@ import org.apache.camel.spi.ShutdownAwar
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,10 +154,20 @@ public class SedaConsumer extends Servic
                 exchange = queue.poll(1000, TimeUnit.MILLISECONDS);
                 if (exchange != null) {
                     try {
-                        sendToConsumers(exchange);
-
+                        // send a new copied exchange with new camel context
+                        Exchange newExchange = 
ExchangeHelper.copyExchangeAndSetCamelContext(exchange, 
endpoint.getCamelContext());
+                        // set the fromEndpoint 
+                        newExchange.setFromEndpoint(endpoint);
+                        sendToConsumers(newExchange);
+                        // copy the message back
+                        if (newExchange.hasOut()) {
+                            exchange.setOut(newExchange.getOut().copy());
+                        } else {
+                            exchange.setIn(newExchange.getIn());
+                        }
                         // log exception if an exception occurred and was not 
handled
-                        if (exchange.getException() != null) {
+                        if (newExchange.getException() != null) {
+                            exchange.setException(newExchange.getException());
                             getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                         }
                     } catch (Exception e) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=1209401&r1=1209400&r2=1209401&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
Fri Dec  2 10:56:36 2011
@@ -18,6 +18,7 @@ package org.apache.camel.util;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import org.apache.camel.NoSuchHeaderExce
 import org.apache.camel.NoSuchPropertyException;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.UnitOfWork;
 
 /**
@@ -651,4 +653,27 @@ public final class ExchangeHelper {
             exchange.setOut(null);
         }
     }
+    
+    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, 
CamelContext context) {
+        DefaultExchange answer = new DefaultExchange(context, 
exchange.getPattern());
+        if (exchange.hasProperties()) {
+            answer.setProperties(safeCopy(exchange.getProperties()));
+        }
+        // Need to hand over the completion for async invocation
+        exchange.handoverCompletions(answer);        
+        answer.setIn(exchange.getIn().copy());
+        if (exchange.hasOut()) {
+            answer.setOut(exchange.getOut().copy());
+        }
+        answer.setException(exchange.getException());
+        return answer;
+        
+    }
+    
+    private static Map<String, Object> safeCopy(Map<String, Object> 
properties) {
+        if (properties == null) {
+            return null;
+        }
+        return new ConcurrentHashMap<String, Object>(properties);
+    }
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java?rev=1209401&r1=1209400&r2=1209401&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/AbstractVmTestSupport.java
 Fri Dec  2 10:56:36 2011
@@ -39,6 +39,7 @@ public abstract class AbstractVmTestSupp
         super.setUp();
         
         context2 = new DefaultCamelContext();
+        
         RouteBuilder routeBuilder = createRouteBuilderForSecondContext();
         if (routeBuilder != null) {
             context2.addRoutes(routeBuilder);            

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java?rev=1209401&r1=1209400&r2=1209401&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmDifferentOptionsOnConsumerAndProducerTest.java
 Fri Dec  2 10:56:36 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.component.vm;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
 /**
@@ -26,11 +27,16 @@ public class VmDifferentOptionsOnConsume
 
     @Test
     public void testSendToVm() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Hello World");
+        
 
         template2.sendBody("direct:start", "Hello World");
 
         assertMockEndpointsSatisfied();
+        
+        // check the camel context of the exchange
+        assertEquals("Get a wrong context. ", context, 
result.getExchanges().get(0).getContext());
     }
 
     @Override


Reply via email to