Author: cschneider Date: Wed Jun 15 08:33:57 2011 New Revision: 1135955 URL: http://svn.apache.org/viewvc?rev=1135955&view=rev Log: Decoupling output stream from conduit
Added: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java (with props) Modified: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Modified: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=1135955&r1=1135954&r2=1135955&view=diff ============================================================================== --- camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java (original) +++ camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Wed Jun 15 08:33:57 2011 @@ -21,26 +21,18 @@ import java.io.OutputStream; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.cxf.common.header.CxfHeaderHelper; -import org.apache.camel.component.cxf.common.message.CxfMessageHelper; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ObjectHelper; import org.apache.cxf.Bus; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.configuration.Configurable; import org.apache.cxf.configuration.Configurer; -import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.AbstractConduit; -import org.apache.cxf.transport.Conduit; -import org.apache.cxf.transport.Destination; -import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +98,12 @@ public class CamelConduit extends Abstra // prepare the message for send out , not actually send out the message public void prepare(Message message) throws IOException { LOG.trace("CamelConduit send message"); - message.setContent(OutputStream.class, new CamelOutputStream(message)); + CamelOutputStream os = new CamelOutputStream(this.targetCamelEndpointUri, + this.producer, + this.headerFilterStrategy, + this.getMessageObserver(), + message); + message.setContent(OutputStream.class, os); } public void close() { @@ -153,108 +150,4 @@ public class CamelConduit extends Abstra camelTemplate = template; } - private class CamelOutputStream extends CachedOutputStream { - private Message outMessage; - private boolean isOneWay; - - public CamelOutputStream(Message m) { - outMessage = m; - } - - protected void doFlush() throws IOException { - // do nothing here - } - - protected void doClose() throws IOException { - isOneWay = outMessage.getExchange().isOneWay(); - commitOutputMessage(); - } - - protected void onWrite() throws IOException { - // do nothing here - } - - - private void commitOutputMessage() throws IOException { - ExchangePattern pattern; - if (isOneWay) { - pattern = ExchangePattern.InOnly; - } else { - pattern = ExchangePattern.InOut; - } - LOG.debug("send the message to endpoint {}", targetCamelEndpointUri); - org.apache.camel.Exchange exchange = producer.createExchange(pattern); - - exchange.setProperty(Exchange.TO_ENDPOINT, targetCamelEndpointUri); - CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class); - // Send out the request message here, copy the protocolHeader back - CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange); - - // TODO support different encoding - exchange.getIn().setBody(outputStream.getInputStream()); - LOG.debug("template sending request: ", exchange.getIn()); - Exception exception; - try { - producer.process(exchange); - } catch (Exception ex) { - exception = ex; - } - // Throw the exception that the template get - exception = exchange.getException(); - if (exception != null) { - throw new IOException("Cannot send the request message.", exchange.getException()); - } - exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange()); - if (!isOneWay) { - handleResponse(exchange); - } - - } - - private void handleResponse(org.apache.camel.Exchange exchange) throws IOException { - org.apache.cxf.message.Message inMessage = null; - try { - inMessage = CxfMessageHelper.getCxfInMessage(headerFilterStrategy, exchange, true); - } catch (Exception ex) { - throw new IOException("Cannot get the response message. ", ex); - } - incomingObserver.onMessage(inMessage); - } - } - - /** - * Represented decoupled response endpoint. - */ - // TODO: This class is not used - @Deprecated - protected class DecoupledDestination implements Destination { - protected MessageObserver decoupledMessageObserver; - private EndpointReferenceType address; - - DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) { - address = ref; - decoupledMessageObserver = incomingObserver; - } - - public EndpointReferenceType getAddress() { - return address; - } - - public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException { - // shouldn't be called on decoupled endpoint - return null; - } - - public void shutdown() { - } - - public synchronized void setMessageObserver(MessageObserver observer) { - decoupledMessageObserver = observer; - } - - public synchronized MessageObserver getMessageObserver() { - return decoupledMessageObserver; - } - } - } Added: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java?rev=1135955&view=auto ============================================================================== --- camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java (added) +++ camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java Wed Jun 15 08:33:57 2011 @@ -0,0 +1,100 @@ +package org.apache.camel.component.cxf.transport; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.component.cxf.common.header.CxfHeaderHelper; +import org.apache.camel.component.cxf.common.message.CxfMessageHelper; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.cxf.io.CachedOutputStream; +import org.apache.cxf.message.Message; +import org.apache.cxf.transport.MessageObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CamelOutputStream extends CachedOutputStream { + private static final Logger LOG = LoggerFactory.getLogger(CamelOutputStream.class); + + /** + * + */ + private Message outMessage; + private boolean isOneWay; + private String targetCamelEndpointUri; + private Producer producer; + private HeaderFilterStrategy headerFilterStrategy; + private MessageObserver observer; + + public CamelOutputStream(String targetCamelEndpointUri, Producer producer, + HeaderFilterStrategy headerFilterStrategy, MessageObserver observer, + Message m) { + this.targetCamelEndpointUri = targetCamelEndpointUri; + this.producer = producer; + this.headerFilterStrategy = headerFilterStrategy; + this.observer = observer; + outMessage = m; + } + + protected void doFlush() throws IOException { + // do nothing here + } + + protected void doClose() throws IOException { + isOneWay = outMessage.getExchange().isOneWay(); + commitOutputMessage(); + } + + protected void onWrite() throws IOException { + // do nothing here + } + + + private void commitOutputMessage() throws IOException { + ExchangePattern pattern; + if (isOneWay) { + pattern = ExchangePattern.InOnly; + } else { + pattern = ExchangePattern.InOut; + } + LOG.debug("send the message to endpoint {}", this.targetCamelEndpointUri); + org.apache.camel.Exchange exchange = this.producer.createExchange(pattern); + + exchange.setProperty(Exchange.TO_ENDPOINT, this.targetCamelEndpointUri); + CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class); + // Send out the request message here, copy the protocolHeader back + CxfHeaderHelper.propagateCxfToCamel(this.headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange); + + // TODO support different encoding + exchange.getIn().setBody(outputStream.getInputStream()); + LOG.debug("template sending request: ", exchange.getIn()); + Exception exception; + try { + this.producer.process(exchange); + } catch (Exception ex) { + exception = ex; + } + // Throw the exception that the template get + exception = exchange.getException(); + if (exception != null) { + throw new IOException("Cannot send the request message.", exchange.getException()); + } + exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange()); + if (!isOneWay) { + handleResponse(exchange); + } + + } + + private void handleResponse(org.apache.camel.Exchange exchange) throws IOException { + org.apache.cxf.message.Message inMessage = null; + try { + inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true); + } catch (Exception ex) { + throw new IOException("Cannot get the response message. ", ex); + } + this.observer.onMessage(inMessage); + } +} \ No newline at end of file Propchange: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java ------------------------------------------------------------------------------ svn:mime-type = text/plain