Author: davsclaus Date: Thu Dec 20 10:04:23 2012 New Revision: 1424387 URL: http://svn.apache.org/viewvc?rev=1424387&view=rev Log: CAMEL-5899: netty producer should honor connection timeout while waiting for netty to open connection. And keep better track of open/closed channels.
Modified: camel/branches/camel-2.10.x/ (props changed) camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1424386 Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424387&r1=1424386&r2=1424387&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu Dec 20 10:04:23 2012 @@ -18,10 +18,8 @@ package org.apache.camel.component.netty import java.net.InetSocketAddress; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -303,7 +301,7 @@ public class NettyProducer extends Defau } } - private ChannelFuture openConnection() throws Exception { + protected ChannelFuture openConnection() throws Exception { ChannelFuture answer; if (isTcp()) { @@ -363,20 +361,13 @@ public class NettyProducer extends Defau } private Channel openChannel(ChannelFuture channelFuture) throws Exception { - // wait until until the operation is complete - final CountDownLatch latch = new CountDownLatch(1); - channelFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - LOG.trace("Operation complete {}", channelFuture); - latch.countDown(); - } - }); // blocking for channel to be done - LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout()); - latch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS); + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout()); + } + channelFuture.awaitUninterruptibly(configuration.getConnectTimeout()); - if (!channelFuture.isSuccess()) { + if (!channelFuture.isDone() || !channelFuture.isSuccess()) { throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); } Channel answer = channelFuture.getChannel(); Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1424387&r1=1424386&r2=1424387&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu Dec 20 10:04:23 2012 @@ -50,6 +50,9 @@ public class ClientChannelHandler extend @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Channel open: {}", ctx.getChannel()); + } // to keep track of open sockets producer.getAllChannels().add(channelStateEvent.getChannel()); } @@ -90,7 +93,9 @@ public class ClientChannelHandler extend @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - LOG.trace("Channel closed: {}", ctx.getChannel()); + if (LOG.isTraceEnabled()) { + LOG.trace("Channel closed: {}", ctx.getChannel()); + } Exchange exchange = getExchange(ctx); AsyncCallback callback = getAsyncCallback(ctx); @@ -98,6 +103,9 @@ public class ClientChannelHandler extend // remove state producer.removeState(ctx.getChannel()); + // to keep track of open sockets + producer.getAllChannels().remove(ctx.getChannel()); + if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) { // session was closed but no message received. This could be because the remote server had an internal error // and could not return a response. We should count down to stop waiting for a response