Author: davsclaus Date: Thu May 6 14:38:10 2010 New Revision: 941756 URL: http://svn.apache.org/viewvc?rev=941756&view=rev Log: Added transferExchange option to camel-netty.
Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java (with props) Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java Thu May 6 14:38:10 2010 @@ -20,7 +20,7 @@ import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchangeHolder; /** - * Helper to get and set the correct payload when transfering data using camel-mina. + * Helper to get and set the correct payload when transferring data using camel-mina. * Always use this helper instead of direct access on the exchange object. * <p/> * This helper ensures that we can also transfer exchange objects over the wire using the Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Thu May 6 14:38:10 2010 @@ -243,7 +243,7 @@ public class MinaProducer extends Defaul @Override public void sessionClosed(IoSession session) throws Exception { - if (sync && message == null) { + if (sync && !messageReceived) { // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed if (LOG.isDebugEnabled()) { LOG.debug("Session closed but no message received from address: " + this.endpoint.getAddress()); Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java (original) +++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java Thu May 6 14:38:10 2010 @@ -38,7 +38,7 @@ public class MinaTransferExchangeOptionT protected String uri = "mina:tcp://localhost:6321?sync=true&encoding=UTF-8&transferExchange=true"; - public void testMianTransferExchangeOptionWithoutException() throws Exception { + public void testMinaTransferExchangeOptionWithoutException() throws Exception { Exchange exchange = sendExchange(false); assertExchange(exchange, false); } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 14:38:10 2010 @@ -59,6 +59,7 @@ public class NettyConfiguration { private String securityProvider; private boolean disconnect; private boolean lazyChannelCreation = true; + private boolean transferExchange; public NettyConfiguration() { setKeepAlive(true); @@ -148,6 +149,9 @@ public class NettyConfiguration { if (settings.containsKey("lazyChannelCreation")) { setLazyChannelCreation(Boolean.valueOf((String) settings.get("lazyChannelCreation"))); } + if (settings.containsKey("transferExchange")) { + setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange"))); + } } public String getProtocol() { @@ -378,6 +382,14 @@ public class NettyConfiguration { this.lazyChannelCreation = lazyChannelCreation; } + public boolean isTransferExchange() { + return transferExchange; + } + + public void setTransferExchange(boolean transferExchange) { + this.transferExchange = transferExchange; + } + public String getAddress() { return host + ":" + port; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java Thu May 6 14:38:10 2010 @@ -26,6 +26,7 @@ public final class NettyConstants { public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete"; public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext"; public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent"; + public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress"; private NettyConstants() { // Utility class Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Thu May 6 14:38:10 2010 @@ -45,7 +45,9 @@ public class NettyEndpoint extends Defau Exchange exchange = createExchange(); exchange.getIn().setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx); exchange.getIn().setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent); - return exchange; + exchange.getIn().setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress()); + NettyPayloadHelper.setIn(exchange, messageEvent.getMessage()); + return exchange; } public boolean isSingleton() { Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java?rev=941756&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java Thu May 6 14:38:10 2010 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchangeHolder; + +/** + * Helper to get and set the correct payload when transferring data using camel-netty. + * Always use this helper instead of direct access on the exchange object. + * <p/> + * This helper ensures that we can also transfer exchange objects over the wire using the + * <tt>transferExchange=true</tt> option. + * + * @version $Revision$ + */ +public final class NettyPayloadHelper { + + public static Object getIn(NettyEndpoint endpoint, Exchange exchange) { + if (endpoint.getConfiguration().isTransferExchange()) { + // we should transfer the entire exchange over the wire (includes in/out) + return DefaultExchangeHolder.marshal(exchange); + } else { + // normal transfer using the body only + return exchange.getIn().getBody(); + } + } + + public static Object getOut(NettyEndpoint endpoint, Exchange exchange) { + if (endpoint.getConfiguration().isTransferExchange()) { + // we should transfer the entire exchange over the wire (includes in/out) + return DefaultExchangeHolder.marshal(exchange); + } else { + // normal transfer using the body only + return exchange.getOut().getBody(); + } + } + + public static void setIn(Exchange exchange, Object payload) { + if (payload instanceof DefaultExchangeHolder) { + DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload); + } else { + // normal transfer using the body only + exchange.getIn().setBody(payload); + } + } + + public static void setOut(Exchange exchange, Object payload) { + if (payload instanceof DefaultExchangeHolder) { + DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload); + } else { + // normal transfer using the body only and preserve the headers + exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + exchange.getOut().setBody(payload); + } + } + +} Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 14:38:10 2010 @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.CamelException; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.ServicePoolAware; @@ -108,20 +109,50 @@ public class NettyProducer extends Defau openConnection(); } + Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange); + if (body == null) { + LOG.warn("No payload to send for exchange: " + exchange); + return; // exit early since nothing to write + } + if (configuration.isSync()) { + // only initialize latch if we should get a response countdownLatch = new CountDownLatch(1); } + // log what we are writing + if (LOG.isDebugEnabled()) { + Object out = body; + if (body instanceof byte[]) { + // byte arrays is not readable so convert to string + out = exchange.getContext().getTypeConverter().convertTo(String.class, body); + } + LOG.debug("Writing body : " + out); + } + // write the body - NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange); + NettyHelper.writeBody(channel, null, body, exchange); if (configuration.isSync()) { boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS); if (!success) { throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis()); } - Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse(); - exchange.getOut().setBody(response); + + ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler"); + if (handler.getCause() != null) { + throw new CamelExchangeException("Error occurred in ClientChannelHandler", exchange, handler.getCause()); + } else if (!handler.isMessageReceived()) { + // no message received + throw new CamelExchangeException("No response received from remote server: " + configuration.getAddress(), exchange); + } else { + // set the result on either IN or OUT on the original exchange depending on its pattern + if (ExchangeHelper.isOutCapable(exchange)) { + NettyPayloadHelper.setOut(exchange, handler.getMessage()); + } else { + NettyPayloadHelper.setIn(exchange, handler.getMessage()); + } + } } // should channel be closed after complete? Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May 6 14:38:10 2010 @@ -16,6 +16,8 @@ */ package org.apache.camel.component.netty.handlers; +import java.util.concurrent.CountDownLatch; + import org.apache.camel.CamelException; import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.NettyProducer; @@ -32,13 +34,21 @@ import org.jboss.netty.channel.SimpleCha public class ClientChannelHandler extends SimpleChannelUpstreamHandler { private static final transient Log LOG = LogFactory.getLog(ClientChannelHandler.class); private NettyProducer producer; - private Object response; - + private Object message; + private Throwable cause; + private boolean messageReceived; + public ClientChannelHandler(NettyProducer producer) { super(); this.producer = producer; } + public void reset() { + this.message = null; + this.cause = null; + this.messageReceived = false; + } + @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception { // to keep track of open sockets @@ -47,35 +57,59 @@ public class ClientChannelHandler extend @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { + this.message = null; + this.messageReceived = false; + this.cause = exceptionEvent.getCause(); + if (LOG.isDebugEnabled()) { - LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); + LOG.debug("Closing channel as an exception was thrown from Netty", cause); } // close channel in case an exception was thrown NettyHelper.close(exceptionEvent.getChannel()); + } - // must wrap and rethrow since cause can be of Throwable and we must only throw Exception - throw new CamelException(exceptionEvent.getCause()); + @Override + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + if (producer.getConfiguration().isSync() && !messageReceived) { + // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed + if (LOG.isDebugEnabled()) { + LOG.debug("Channel closed but no message received from address: " + producer.getConfiguration().getAddress()); + } + // session was closed but no message received. This could be because the remote server had an internal error + // and could not return a response. We should count down to stop waiting for a response + countDown(); + } } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { - setResponse(messageEvent.getMessage()); + message = messageEvent.getMessage(); + messageReceived = true; + cause = null; if (LOG.isDebugEnabled()) { - LOG.debug("Incoming message:" + response); + LOG.debug("Message received: " + message); } + // signal we have received message + countDown(); + } + + protected void countDown() { if (producer.getConfiguration().isSync()) { producer.getCountdownLatch().countDown(); - } + } + } + + public Object getMessage() { + return message; } - public Object getResponse() { - return response; + public boolean isMessageReceived() { + return messageReceived; } - public void setResponse(Object response) { - this.response = response; + public Throwable getCause() { + return cause; } - } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941756&r1=941755&r2=941756&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 14:38:10 2010 @@ -22,6 +22,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.NettyHelper; +import org.apache.camel.component.netty.NettyPayloadHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -76,7 +77,6 @@ public class ServerChannelHandler extend if (consumer.getConfiguration().isSync()) { exchange.setPattern(ExchangePattern.InOut); } - exchange.getIn().setBody(in); try { consumer.getProcessor().process(exchange); @@ -93,17 +93,18 @@ public class ServerChannelHandler extend private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception { Object body; if (ExchangeHelper.isOutCapable(exchange)) { - body = exchange.getOut().getBody(); + body = NettyPayloadHelper.getOut(consumer.getEndpoint(), exchange); } else { - body = exchange.getIn().getBody(); + body = NettyPayloadHelper.getIn(consumer.getEndpoint(), exchange); } - if (exchange.isFailed()) { - if (exchange.getException() == null) { - // fault detected - body = exchange.getOut().getBody(); - } else { + boolean failed = exchange.isFailed(); + if (failed && !consumer.getEndpoint().getConfiguration().isTransferExchange()) { + if (exchange.getException() != null) { body = exchange.getException(); + } else { + // failed and no exception, must be a fault + body = exchange.getOut().getBody(); } } Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=941756&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Thu May 6 14:38:10 2010 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class NettyInOutWithForcedNoResponseTest extends CamelTestSupport { + + @Test + public void testResponse() throws Exception { + Object out = template.requestBody("netty:tcp://localhost:4444", "Copenhagen"); + assertEquals("Hello Claus", out); + } + + @Test + public void testNoResponse() throws Exception { + try { + template.requestBody("netty:tcp://localhost:4444", "London"); + fail("Should throw an exception"); + } catch (RuntimeCamelException e) { + assertTrue(e.getCause().getMessage().startsWith("No response")); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("netty:tcp://localhost:4444") + .choice() + .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus")) + .otherwise().transform(constant(null)); + } + }; + } +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java?rev=941756&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java Thu May 6 14:38:10 2010 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.nio.charset.Charset; + +import junit.framework.Assert; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class NettyTransferExchangeOptionTest extends CamelTestSupport { + + protected String uri = "netty:tcp://localhost:6321?transferExchange=true"; + + @Test + public void testNettyTransferExchangeOptionWithoutException() throws Exception { + Exchange exchange = sendExchange(false); + assertExchange(exchange, false); + } + + @Test + public void testNettyTransferExchangeOptionWithException() throws Exception { + Exchange exchange = sendExchange(true); + assertExchange(exchange, true); + } + + private Exchange sendExchange(boolean setException) throws Exception { + Endpoint endpoint = context.getEndpoint(uri); + Exchange exchange = endpoint.createExchange(); + + Message message = exchange.getIn(); + message.setBody("Hello!"); + message.setHeader("cheese", "feta"); + exchange.setProperty("ham", "old"); + exchange.setProperty("setException", setException); + + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + + return exchange; + } + + private void assertExchange(Exchange exchange, boolean hasFault) { + if (!hasFault) { + Message out = exchange.getOut(); + assertNotNull(out); + assertFalse(out.isFault()); + assertEquals("Goodbye!", out.getBody()); + assertEquals("cheddar", out.getHeader("cheese")); + } else { + Message fault = exchange.getOut(); + assertNotNull(fault); + assertTrue(fault.isFault()); + assertNotNull(fault.getBody()); + assertTrue("Should get the InterrupteException exception", fault.getBody() instanceof InterruptedException); + assertEquals("nihao", fault.getHeader("hello")); + } + + + // in should stay the same + Message in = exchange.getIn(); + assertNotNull(in); + assertEquals("Hello!", in.getBody()); + assertEquals("feta", in.getHeader("cheese")); + // however the shared properties have changed + assertEquals("fresh", exchange.getProperty("salami")); + assertNull(exchange.getProperty("Charset")); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(uri).process(new Processor() { + public void process(Exchange e) throws InterruptedException { + Assert.assertNotNull(e.getIn().getBody()); + Assert.assertNotNull(e.getIn().getHeaders()); + Assert.assertNotNull(e.getProperties()); + Assert.assertEquals("Hello!", e.getIn().getBody()); + Assert.assertEquals("feta", e.getIn().getHeader("cheese")); + Assert.assertEquals("old", e.getProperty("ham")); + Assert.assertEquals(ExchangePattern.InOut, e.getPattern()); + Boolean setException = (Boolean) e.getProperty("setException"); + + if (setException) { + e.getOut().setFault(true); + e.getOut().setBody(new InterruptedException()); + e.getOut().setHeader("hello", "nihao"); + } else { + e.getOut().setBody("Goodbye!"); + e.getOut().setHeader("cheese", "cheddar"); + } + e.setProperty("salami", "fresh"); + e.setProperty("Charset", Charset.defaultCharset()); + } + }); + } + }; + } +} + + Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date