Author: ningjiang Date: Wed Apr 7 01:46:24 2010 New Revision: 931403 URL: http://svn.apache.org/viewvc?rev=931403&view=rev Log: CAMEL-2618 Do not use ProducerTemplate internally to send to same destination
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=931403&r1=931402&r2=931403&view=diff ============================================================================== --- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java (original) +++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Wed Apr 7 01:46:24 2010 @@ -22,9 +22,13 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; +import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.cxf.CxfConstants; import org.apache.camel.component.cxf.util.CxfHeaderHelper; import org.apache.camel.component.cxf.util.CxfMessageHelper; @@ -53,6 +57,7 @@ public class CamelConduit extends Abstra private CamelContext camelContext; private EndpointInfo endpointInfo; private String targetCamelEndpointUri; + private Producer producer; private ProducerTemplate camelTemplate; private Bus bus; private HeaderFilterStrategy headerFilterStrategy; @@ -80,6 +85,13 @@ public class CamelConduit extends Abstra bus = b; initConfig(); this.headerFilterStrategy = headerFilterStrategy; + Endpoint target = getCamelContext().getEndpoint(targetCamelEndpointUri); + try { + producer = target.createProducer(); + producer.start(); + } catch (Exception e) { + throw new RuntimeCamelException("Cannot create the producer rightly", e); + } } public void setCamelContext(CamelContext context) { @@ -102,7 +114,12 @@ public class CamelConduit extends Abstra public void close() { getLogger().log(Level.FINE, "CamelConduit closed "); - + // shutdown the producer + try { + producer.stop(); + } catch (Exception e) { + getLogger().log(Level.WARNING, "CamelConduit producer stop with the exception", e); + } } protected Logger getLogger() { @@ -126,6 +143,7 @@ public class CamelConduit extends Abstra } } + @Deprecated public ProducerTemplate getCamelTemplate() { if (camelTemplate == null) { camelTemplate = getCamelContext().createProducerTemplate(); @@ -133,6 +151,7 @@ public class CamelConduit extends Abstra return camelTemplate; } + @Deprecated public void setCamelTemplate(ProducerTemplate template) { camelTemplate = template; } @@ -167,23 +186,28 @@ public class CamelConduit extends Abstra pattern = ExchangePattern.InOut; } getLogger().log(Level.FINE, "send the message to endpoint" + targetCamelEndpointUri); - // We could wait for the rely asynchronously - org.apache.camel.Exchange exchange = getCamelTemplate().send(targetCamelEndpointUri, pattern, new Processor() { - public void process(org.apache.camel.Exchange ex) throws IOException { - CachedOutputStream outputStream = (CachedOutputStream)outMessage.getContent(OutputStream.class); - // Send out the request message here, copy the protocolHeader back - CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, outMessage, ex.getIn().getHeaders(), ex); - - // TODO support different encoding - ex.getIn().setBody(outputStream.getBytes()); - getLogger().log(Level.FINE, "template sending request: ", ex.getIn()); - } - }); - exchange.setProperty(CxfConstants.CXF_EXCHANGE, outMessage.getExchange()); + org.apache.camel.Exchange exchange = producer.createExchange(pattern); + + exchange.setProperty(Exchange.TO_ENDPOINT, targetCamelEndpointUri); + CachedOutputStream outputStream = (CachedOutputStream)outMessage.getContent(OutputStream.class); + // Send out the request message here, copy the protocolHeader back + CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange); + + // TODO support different encoding + exchange.getIn().setBody(outputStream.getBytes()); + getLogger().log(Level.FINE, "template sending request: ", exchange.getIn()); + Exception exception = null; + try { + producer.process(exchange); + } catch (Exception ex) { + exception = ex; + } // Throw the exception that the template get - if (exchange.getException() != null) { + exception = exchange.getException(); + if (exception != null) { throw IOHelper.createIOException("Can't send the request message.", exchange.getException()); } + exchange.setProperty(CxfConstants.CXF_EXCHANGE, outMessage.getExchange()); if (!isOneWay) { handleResponse(exchange); }