Author: davsclaus Date: Sat May 15 17:32:15 2010 New Revision: 944686 URL: http://svn.apache.org/viewvc?rev=944686&view=rev Log: CAMEL-2721: Added options on mina/netty to control what to do in consumer if no reply to send back.
Removed: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaMulticastTest.java Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java Sat May 15 17:32:15 2010 @@ -19,6 +19,7 @@ package org.apache.camel.component.mina; import java.nio.charset.Charset; import java.util.List; +import org.apache.camel.LoggingLevel; import org.apache.camel.RuntimeCamelException; import org.apache.mina.common.IoFilter; import org.apache.mina.filter.codec.ProtocolCodecFactory; @@ -44,6 +45,8 @@ public class MinaConfiguration implement private List<IoFilter> filters; private boolean allowDefaultCodec = true; private boolean disconnect; + private boolean disconnectOnNoReply = true; + private LoggingLevel noReplyLogLevel = LoggingLevel.WARN; /** * Returns a copy of this configuration @@ -189,7 +192,7 @@ public class MinaConfiguration implement } public boolean isDatagramProtocol() { - return protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast"); + return protocol.equals("udp"); } public void setAllowDefaultCodec(boolean allowDefaultCodec) { @@ -207,4 +210,20 @@ public class MinaConfiguration implement public void setDisconnect(boolean disconnect) { this.disconnect = disconnect; } + + public boolean isDisconnectOnNoReply() { + return disconnectOnNoReply; + } + + public void setDisconnectOnNoReply(boolean disconnectOnNoReply) { + this.disconnectOnNoReply = disconnectOnNoReply; + } + + public LoggingLevel getNoReplyLogLevel() { + return noReplyLogLevel; + } + + public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) { + this.noReplyLogLevel = noReplyLogLevel; + } } Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat May 15 17:32:15 2010 @@ -22,6 +22,7 @@ import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.processor.Logger; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +43,7 @@ public class MinaConsumer extends Defaul private final SocketAddress address; private final IoAcceptor acceptor; private boolean sync; + private Logger noReplyLogger; public MinaConsumer(final MinaEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -49,6 +51,7 @@ public class MinaConsumer extends Defaul this.address = endpoint.getAddress(); this.acceptor = endpoint.getAcceptor(); this.sync = endpoint.getConfiguration().isSync(); + this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel()); } @Override @@ -132,10 +135,15 @@ public class MinaConsumer extends Defaul } if (body == null) { - // must close session if no data to write otherwise client will never receive a response - // and wait forever (if not timing out) - LOG.warn("Cannot write body since its null, closing session: " + exchange); - session.close(); + noReplyLogger.log("No payload to send as reply for exchange: " + exchange); + if (endpoint.getConfiguration().isDisconnectOnNoReply()) { + // must close session if no data to write otherwise client will never receive a response + // and wait forever (if not timing out) + if (LOG.isDebugEnabled()) { + LOG.debug("Closing session as no payload to send as reply at address: " + address); + } + session.close(); + } } else { // we got a response to write if (LOG.isDebugEnabled()) { Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat May 15 17:32:15 2010 @@ -25,6 +25,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.ServicePoolAware; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.processor.Logger; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +50,7 @@ public class MinaProducer extends Defaul private long timeout; private IoConnector connector; private boolean sync; + private Logger noReplyLogger; public MinaProducer(MinaEndpoint endpoint) { super(endpoint); @@ -56,6 +58,7 @@ public class MinaProducer extends Defaul this.lazySessionCreation = endpoint.getConfiguration().isLazySessionCreation(); this.timeout = endpoint.getConfiguration().getTimeout(); this.sync = endpoint.getConfiguration().isSync(); + this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel()); } @Override @@ -80,7 +83,7 @@ public class MinaProducer extends Defaul Object body = MinaPayloadHelper.getIn(endpoint, exchange); if (body == null) { - LOG.warn("No payload to send for exchange: " + exchange); + noReplyLogger.log("No payload to send for exchange: " + exchange); return; // exit early since nothing to write } Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java (original) +++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java Sat May 15 17:32:15 2010 @@ -17,6 +17,7 @@ package org.apache.camel.component.mina; import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; @@ -39,12 +40,26 @@ public class MinaInOutWithForcedNoRespon } } + public void testNoResponseDisconnectOnNoReplyFalse() throws Exception { + try { + template.requestBody("mina:tcp://localhost:4445?sync=true&timeout=3000", "London"); + fail("Should throw an exception"); + } catch (RuntimeCamelException e) { + assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause()); + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("mina:tcp://localhost:4444?sync=true") - .choice() + .choice() + .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus")) + .otherwise().transform(constant(null)); + + from("mina:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF") + .choice() .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus")) .otherwise().transform(constant(null)); } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Sat May 15 17:32:15 2010 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.camel.LoggingLevel; import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.URISupport; import org.jboss.netty.channel.ChannelDownstreamHandler; @@ -61,6 +62,8 @@ public class NettyConfiguration implemen private boolean disconnect; private boolean lazyChannelCreation = true; private boolean transferExchange; + private boolean disconnectOnNoReply = true; + private LoggingLevel noReplyLogLevel = LoggingLevel.WARN; /** * Returns a copy of this configuration @@ -154,6 +157,12 @@ public class NettyConfiguration implemen if (settings.containsKey("transferExchange")) { setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange"))); } + if (settings.containsKey("disconnectOnNoReply")) { + setDisconnectOnNoReply(Boolean.valueOf((String) settings.get("disconnectOnNoReply"))); + } + if (settings.containsKey("noReplyLogLevel")) { + setNoReplyLogLevel(LoggingLevel.valueOf((String) settings.get("noReplyLogLevel"))); + } } public String getProtocol() { @@ -392,6 +401,22 @@ public class NettyConfiguration implemen this.transferExchange = transferExchange; } + public boolean isDisconnectOnNoReply() { + return disconnectOnNoReply; + } + + public void setDisconnectOnNoReply(boolean disconnectOnNoReply) { + this.disconnectOnNoReply = disconnectOnNoReply; + } + + public LoggingLevel getNoReplyLogLevel() { + return noReplyLogLevel; + } + + public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) { + this.noReplyLogLevel = noReplyLogLevel; + } + public String getAddress() { return host + ":" + port; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sat May 15 17:32:15 2010 @@ -29,6 +29,7 @@ import org.apache.camel.ExchangeTimedOut import org.apache.camel.ServicePoolAware; import org.apache.camel.component.netty.handlers.ClientChannelHandler; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.processor.Logger; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,12 +59,14 @@ public class NettyProducer extends Defau private ConnectionlessBootstrap connectionlessClientBootstrap; private ClientPipelineFactory clientPipelineFactory; private ChannelPipeline clientPipeline; + private Logger noReplyLogger; public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) { super(nettyEndpoint); this.configuration = configuration; this.context = this.getEndpoint().getCamelContext(); this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri()); + this.noReplyLogger = new Logger(LOG, configuration.getNoReplyLogLevel()); } @Override @@ -111,7 +114,7 @@ public class NettyProducer extends Defau Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange); if (body == null) { - LOG.warn("No payload to send for exchange: " + exchange); + noReplyLogger.log("No payload to send for exchange: " + exchange); return; // exit early since nothing to write } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Sat May 15 17:32:15 2010 @@ -23,6 +23,7 @@ import org.apache.camel.component.netty. import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.NettyPayloadHelper; +import org.apache.camel.processor.Logger; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,10 +38,12 @@ import org.jboss.netty.channel.SimpleCha public class ServerChannelHandler extends SimpleChannelUpstreamHandler { private static final transient Log LOG = LogFactory.getLog(ServerChannelHandler.class); private NettyConsumer consumer; - + private Logger noReplyLogger; + public ServerChannelHandler(NettyConsumer consumer) { super(); this.consumer = consumer; + this.noReplyLogger = new Logger(LOG, consumer.getConfiguration().getNoReplyLogLevel()); } @Override @@ -110,10 +113,15 @@ public class ServerChannelHandler extend } if (body == null) { - // must close session if no data to write otherwise client will never receive a response - // and wait forever (if not timing out) - LOG.warn("Cannot write body since its null, closing channel: " + exchange); - NettyHelper.close(messageEvent.getChannel()); + noReplyLogger.log("No payload to send as reply for exchange: " + exchange); + if (consumer.getConfiguration().isDisconnectOnNoReply()) { + // must close session if no data to write otherwise client will never receive a response + // and wait forever (if not timing out) + if (LOG.isDebugEnabled()) { + LOG.debug("Closing channel as no payload to send as reply at address: " + messageEvent.getRemoteAddress()); + } + NettyHelper.close(messageEvent.getChannel()); + } } else { // we got a body to write if (LOG.isDebugEnabled()) { Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=944686&r1=944685&r2=944686&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Sat May 15 17:32:15 2010 @@ -16,6 +16,7 @@ */ package org.apache.camel.component.netty; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; @@ -42,6 +43,16 @@ public class NettyInOutWithForcedNoRespo } } + @Test + public void testNoResponseDisconnectOnNoReplyFalse() throws Exception { + try { + template.requestBody("netty:tcp://localhost:4445?sync=true&timeout=3000", "London"); + fail("Should throw an exception"); + } catch (RuntimeCamelException e) { + assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause()); + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -50,6 +61,11 @@ public class NettyInOutWithForcedNoRespo .choice() .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus")) .otherwise().transform(constant(null)); + + from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF") + .choice() + .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus")) + .otherwise().transform(constant(null)); } }; }