Author: davsclaus Date: Wed Jun 13 11:39:45 2012 New Revision: 1349766 URL: http://svn.apache.org/viewvc?rev=1349766&view=rev Log: CAMEL-5151: Fixed camel-netty to support proxy use cases.
Added: camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java - copied unchanged from r1349765, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1349765 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=1349766&r1=1349765&r2=1349766&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Wed Jun 13 11:39:45 2012 @@ -18,11 +18,11 @@ package org.apache.camel.component.netty import java.net.SocketAddress; -import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,16 +73,16 @@ public final class NettyHelper { } /** - * Writes the given body to Netty channel. Will wait until the body has been written. + * Writes the given body to Netty channel. Will <b>not</b >wait until the body has been written. * * @param channel the Netty channel * @param remoteAddress the remote address when using UDP * @param body the body to write (send) * @param exchange the exchange - * @throws CamelExchangeException is thrown if the body could not be written for some reasons - * (eg remote connection is closed etc.) + * @param listener listener with work to be executed when the operation is complete */ - public static void writeBodySync(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException { + public static void writeBodyAsync(Channel channel, SocketAddress remoteAddress, Object body, + Exchange exchange, ChannelFutureListener listener) { // the write operation is asynchronous. Use future to wait until the session has been written ChannelFuture future; if (remoteAddress != null) { @@ -91,15 +91,7 @@ public final class NettyHelper { future = channel.write(body); } - // wait for the write - LOG.trace("Waiting for write to complete"); - future.awaitUninterruptibly(); - - // if it was not a success then thrown an exception - if (!future.isSuccess()) { - LOG.warn("Cannot write body: " + body + " using channel: " + channel); - throw new CamelExchangeException("Cannot write body", exchange, future.getCause()); - } + future.addListener(listener); } /** Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1349766&r1=1349765&r2=1349766&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Wed Jun 13 11:39:45 2012 @@ -17,6 +17,7 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -172,6 +173,7 @@ public class NettyProducer extends Defau // allow to reuse channel, on this producer, to avoid creating a new connection // for each message being sent if (channelFuture == null || channel == null || !channel.isOpen()) { + channel = null; channelFuture = openConnection(); channel = openChannel(channelFuture); } @@ -277,6 +279,7 @@ public class NettyProducer extends Defau // set the pipeline factory, which creates the pipeline for each newly created channels clientBootstrap.setPipelineFactory(pipelineFactory); answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + LOG.trace("Created new TCP client bootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort()); return answer; } else { ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); @@ -294,17 +297,31 @@ public class NettyProducer extends Defau Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0)); ALL_CHANNELS.add(channel); answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + LOG.trace("Created new UDP client bootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort()); return answer; } } private Channel openChannel(ChannelFuture channelFuture) throws Exception { - // wait until we got connection - channelFuture.awaitUninterruptibly(); + // wait until until the operation is complete + final CountDownLatch latch = new CountDownLatch(1); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + LOG.debug("Operation complete {}", channelFuture); + latch.countDown(); + } + }); + // blocking for channel to be done + LOG.trace("Waiting for operation to complete {}", channelFuture); + latch.await(); + if (!channelFuture.isSuccess()) { + // clear channel as we did not connect + channel = null; throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); } - Channel channel = channelFuture.getChannel(); + channel = channelFuture.getChannel(); // to keep track of all channels in use ALL_CHANNELS.add(channel); Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1349766&r1=1349765&r2=1349766&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Wed Jun 13 11:39:45 2012 @@ -16,6 +16,9 @@ */ package org.apache.camel.component.netty.handlers; +import java.net.SocketAddress; + +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.component.netty.NettyConstants; @@ -25,6 +28,8 @@ import org.apache.camel.component.netty. import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; @@ -132,31 +137,56 @@ public class ServerChannelHandler extend // we got a body to write LOG.debug("Writing body: {}", body); + ChannelFutureListener listener = new ResponseFutureListener(exchange, messageEvent.getRemoteAddress()); if (consumer.getConfiguration().isTcp()) { - NettyHelper.writeBodySync(messageEvent.getChannel(), null, body, exchange); + NettyHelper.writeBodyAsync(messageEvent.getChannel(), null, body, exchange, listener); } else { - NettyHelper.writeBodySync(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange); + NettyHelper.writeBodyAsync(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange, listener); } } + } - // should channel be closed after complete? - Boolean close; - if (ExchangeHelper.isOutCapable(exchange)) { - close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); - } else { - close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); - } + /** + * A {@link ChannelFutureListener} that performs the disconnect logic when + * sending the response is complete. + */ + private final class ResponseFutureListener implements ChannelFutureListener { + + private final Exchange exchange; + private final SocketAddress remoteAddress; + + private ResponseFutureListener(Exchange exchange, SocketAddress remoteAddress) { + this.exchange = exchange; + this.remoteAddress = remoteAddress; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // if it was not a success then thrown an exception + if (!future.isSuccess()) { + Exception e = new CamelExchangeException("Cannot write response to " + remoteAddress, exchange, future.getCause()); + consumer.getExceptionHandler().handleException(e); + } - // should we disconnect, the header can override the configuration - boolean disconnect = consumer.getConfiguration().isDisconnect(); - if (close != null) { - disconnect = close; - } - if (disconnect) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing channel when complete at address: {}", messageEvent.getRemoteAddress()); + // should channel be closed after complete? + Boolean close; + if (ExchangeHelper.isOutCapable(exchange)) { + close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } else { + close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } + + // should we disconnect, the header can override the configuration + boolean disconnect = consumer.getConfiguration().isDisconnect(); + if (close != null) { + disconnect = close; + } + if (disconnect) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing channel when complete at address: {}", remoteAddress); + } + NettyHelper.close(future.getChannel()); } - NettyHelper.close(messageEvent.getChannel()); } }