Author: davsclaus
Date: Wed Jun 13 11:39:45 2012
New Revision: 1349766

URL: http://svn.apache.org/viewvc?rev=1349766&view=rev
Log:
CAMEL-5151: Fixed camel-netty to support proxy use cases.

Added:
    
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java
      - copied unchanged from r1349765, 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1349765

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=1349766&r1=1349765&r2=1349766&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
 Wed Jun 13 11:39:45 2012
@@ -18,11 +18,11 @@ package org.apache.camel.component.netty
 
 import java.net.SocketAddress;
 
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,16 +73,16 @@ public final class NettyHelper {
     }
 
     /**
-     * Writes the given body to Netty channel. Will wait until the body has 
been written.
+     * Writes the given body to Netty channel. Will <b>not</b >wait until the 
body has been written.
      *
      * @param channel         the Netty channel
      * @param remoteAddress   the remote address when using UDP
      * @param body            the body to write (send)
      * @param exchange        the exchange
-     * @throws CamelExchangeException is thrown if the body could not be 
written for some reasons
-     *                                (eg remote connection is closed etc.)
+     * @param listener        listener with work to be executed when the 
operation is complete
      */
-    public static void writeBodySync(Channel channel, SocketAddress 
remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
+    public static void writeBodyAsync(Channel channel, SocketAddress 
remoteAddress, Object body,
+                                      Exchange exchange, ChannelFutureListener 
listener) {
         // the write operation is asynchronous. Use future to wait until the 
session has been written
         ChannelFuture future;
         if (remoteAddress != null) {
@@ -91,15 +91,7 @@ public final class NettyHelper {
             future = channel.write(body);
         }
 
-        // wait for the write
-        LOG.trace("Waiting for write to complete");
-        future.awaitUninterruptibly();
-
-        // if it was not a success then thrown an exception
-        if (!future.isSuccess()) {
-            LOG.warn("Cannot write body: " + body + " using channel: " + 
channel);
-            throw new CamelExchangeException("Cannot write body", exchange, 
future.getCause());
-        }
+        future.addListener(listener);
     }
 
     /**

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1349766&r1=1349765&r2=1349766&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 Wed Jun 13 11:39:45 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -172,6 +173,7 @@ public class NettyProducer extends Defau
             // allow to reuse channel, on this producer, to avoid creating a 
new connection
             // for each message being sent
             if (channelFuture == null || channel == null || !channel.isOpen()) 
{
+                channel = null;
                 channelFuture = openConnection();
                 channel = openChannel(channelFuture);
             }
@@ -277,6 +279,7 @@ public class NettyProducer extends Defau
             // set the pipeline factory, which creates the pipeline for each 
newly created channels
             clientBootstrap.setPipelineFactory(pipelineFactory);
             answer = clientBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            LOG.trace("Created new TCP client bootstrap connecting to {}:{}", 
configuration.getHost(), configuration.getPort());
             return answer;
         } else {
             ConnectionlessBootstrap connectionlessClientBootstrap = new 
ConnectionlessBootstrap(datagramChannelFactory);
@@ -294,17 +297,31 @@ public class NettyProducer extends Defau
             Channel channel = connectionlessClientBootstrap.bind(new 
InetSocketAddress(0));
             ALL_CHANNELS.add(channel);
             answer = connectionlessClientBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            LOG.trace("Created new UDP client bootstrap connecting to {}:{}", 
configuration.getHost(), configuration.getPort());
             return answer;
         }
     }
 
     private Channel openChannel(ChannelFuture channelFuture) throws Exception {
-        // wait until we got connection
-        channelFuture.awaitUninterruptibly();
+        // wait until until the operation is complete
+        final CountDownLatch latch = new CountDownLatch(1);
+        channelFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture channelFuture) throws 
Exception {
+                LOG.debug("Operation complete {}", channelFuture);
+                latch.countDown();
+            }
+        });
+        // blocking for channel to be done
+        LOG.trace("Waiting for operation to complete {}", channelFuture);
+        latch.await();
+
         if (!channelFuture.isSuccess()) {
+            // clear channel as we did not connect
+            channel = null;
             throw new CamelException("Cannot connect to " + 
configuration.getAddress(), channelFuture.getCause());
         }
-        Channel channel = channelFuture.getChannel();
+        channel = channelFuture.getChannel();
         // to keep track of all channels in use
         ALL_CHANNELS.add(channel);
 

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1349766&r1=1349765&r2=1349766&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
 Wed Jun 13 11:39:45 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.netty.handlers;
 
+import java.net.SocketAddress;
+
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.netty.NettyConstants;
@@ -25,6 +28,8 @@ import org.apache.camel.component.netty.
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -132,31 +137,56 @@ public class ServerChannelHandler extend
 
             // we got a body to write
             LOG.debug("Writing body: {}", body);
+            ChannelFutureListener listener = new 
ResponseFutureListener(exchange, messageEvent.getRemoteAddress());
             if (consumer.getConfiguration().isTcp()) {
-                NettyHelper.writeBodySync(messageEvent.getChannel(), null, 
body, exchange);
+                NettyHelper.writeBodyAsync(messageEvent.getChannel(), null, 
body, exchange, listener);
             } else {
-                NettyHelper.writeBodySync(messageEvent.getChannel(), 
messageEvent.getRemoteAddress(), body, exchange);
+                NettyHelper.writeBodyAsync(messageEvent.getChannel(), 
messageEvent.getRemoteAddress(), body, exchange, listener);
             }
         }
+    }
 
-        // should channel be closed after complete?
-        Boolean close;
-        if (ExchangeHelper.isOutCapable(exchange)) {
-            close = 
exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
-        } else {
-            close = 
exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
-        }
+    /**
+     * A {@link ChannelFutureListener} that performs the disconnect logic when
+     * sending the response is complete.
+     */
+    private final class ResponseFutureListener implements 
ChannelFutureListener {
+
+        private final Exchange exchange;
+        private final SocketAddress remoteAddress;
+
+        private ResponseFutureListener(Exchange exchange, SocketAddress 
remoteAddress) {
+            this.exchange = exchange;
+            this.remoteAddress = remoteAddress;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            // if it was not a success then thrown an exception
+            if (!future.isSuccess()) {
+                Exception e = new CamelExchangeException("Cannot write 
response to " + remoteAddress, exchange, future.getCause());
+                consumer.getExceptionHandler().handleException(e);
+            }
 
-        // should we disconnect, the header can override the configuration
-        boolean disconnect = consumer.getConfiguration().isDisconnect();
-        if (close != null) {
-            disconnect = close;
-        }
-        if (disconnect) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Closing channel when complete at address: {}", 
messageEvent.getRemoteAddress());
+            // should channel be closed after complete?
+            Boolean close;
+            if (ExchangeHelper.isOutCapable(exchange)) {
+                close = 
exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
+            } else {
+                close = 
exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
+            }
+
+            // should we disconnect, the header can override the configuration
+            boolean disconnect = consumer.getConfiguration().isDisconnect();
+            if (close != null) {
+                disconnect = close;
+            }
+            if (disconnect) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing channel when complete at address: {}", 
remoteAddress);
+                }
+                NettyHelper.close(future.getChannel());
             }
-            NettyHelper.close(messageEvent.getChannel());
         }
     }
 


Reply via email to