Author: ningjiang
Date: Wed Apr  7 01:46:24 2010
New Revision: 931403

URL: http://svn.apache.org/viewvc?rev=931403&view=rev
Log:
CAMEL-2618 Do not use ProducerTemplate internally to send to same destination

Modified:
    
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java

Modified: 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=931403&r1=931402&r2=931403&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
 (original)
+++ 
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
 Wed Apr  7 01:46:24 2010
@@ -22,9 +22,13 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.cxf.CxfConstants;
 import org.apache.camel.component.cxf.util.CxfHeaderHelper;
 import org.apache.camel.component.cxf.util.CxfMessageHelper;
@@ -53,6 +57,7 @@ public class CamelConduit extends Abstra
     private CamelContext camelContext;
     private EndpointInfo endpointInfo;
     private String targetCamelEndpointUri;
+    private Producer producer;
     private ProducerTemplate camelTemplate;
     private Bus bus;
     private HeaderFilterStrategy headerFilterStrategy;
@@ -80,6 +85,13 @@ public class CamelConduit extends Abstra
         bus = b;
         initConfig();
         this.headerFilterStrategy = headerFilterStrategy;
+        Endpoint target = 
getCamelContext().getEndpoint(targetCamelEndpointUri);
+        try {
+            producer = target.createProducer();
+            producer.start();
+        } catch (Exception e) {
+            throw new RuntimeCamelException("Cannot create the producer 
rightly", e);
+        }
     }
 
     public void setCamelContext(CamelContext context) {
@@ -102,7 +114,12 @@ public class CamelConduit extends Abstra
 
     public void close() {
         getLogger().log(Level.FINE, "CamelConduit closed ");
-
+        // shutdown the producer
+        try {
+            producer.stop();
+        } catch (Exception e) {
+            getLogger().log(Level.WARNING, "CamelConduit producer stop with 
the exception", e);
+        }
     }
 
     protected Logger getLogger() {
@@ -126,6 +143,7 @@ public class CamelConduit extends Abstra
         }
     }
 
+    @Deprecated
     public ProducerTemplate getCamelTemplate() {
         if (camelTemplate == null) {            
             camelTemplate = getCamelContext().createProducerTemplate();
@@ -133,6 +151,7 @@ public class CamelConduit extends Abstra
         return camelTemplate;
     }
 
+    @Deprecated
     public void setCamelTemplate(ProducerTemplate template) {
         camelTemplate = template;
     }
@@ -167,23 +186,28 @@ public class CamelConduit extends Abstra
                 pattern = ExchangePattern.InOut;
             }
             getLogger().log(Level.FINE, "send the message to endpoint" + 
targetCamelEndpointUri);
-            // We could wait for the rely asynchronously
-            org.apache.camel.Exchange exchange = 
getCamelTemplate().send(targetCamelEndpointUri, pattern, new Processor() {
-                public void process(org.apache.camel.Exchange ex) throws 
IOException {
-                    CachedOutputStream outputStream = 
(CachedOutputStream)outMessage.getContent(OutputStream.class);
-                    // Send out the request message here, copy the 
protocolHeader back
-                    CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, 
outMessage, ex.getIn().getHeaders(), ex);
- 
-                    // TODO support different encoding
-                    ex.getIn().setBody(outputStream.getBytes());
-                    getLogger().log(Level.FINE, "template sending request: ", 
ex.getIn());
-                }
-            });
-            exchange.setProperty(CxfConstants.CXF_EXCHANGE, 
outMessage.getExchange());
+            org.apache.camel.Exchange exchange = 
producer.createExchange(pattern);
+            
+            exchange.setProperty(Exchange.TO_ENDPOINT, targetCamelEndpointUri);
+            CachedOutputStream outputStream = 
(CachedOutputStream)outMessage.getContent(OutputStream.class);
+            // Send out the request message here, copy the protocolHeader back
+            CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, 
outMessage, exchange.getIn().getHeaders(), exchange);
+
+            // TODO support different encoding
+            exchange.getIn().setBody(outputStream.getBytes());
+            getLogger().log(Level.FINE, "template sending request: ", 
exchange.getIn());
+            Exception exception = null;
+            try {
+                producer.process(exchange);
+            } catch (Exception ex) {
+                exception = ex;                
+            }
             // Throw the exception that the template get
-            if (exchange.getException() != null) {
+            exception = exchange.getException();            
+            if (exception != null) {
                 throw IOHelper.createIOException("Can't send the request 
message.", exchange.getException());
             }
+            exchange.setProperty(CxfConstants.CXF_EXCHANGE, 
outMessage.getExchange());
             if (!isOneWay) {
                 handleResponse(exchange);
             }


Reply via email to