Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x f0e3e2015 -> f0ddb5ef7
  refs/heads/camel-2.14.x 6eb1275ab -> 7f730a86b


CAMEL-7909 camel-netty-http consumer need to close the connection if the 
response connection header is close


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/14ba1b38
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/14ba1b38
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/14ba1b38

Branch: refs/heads/camel-2.14.x
Commit: 14ba1b38f468d12d2d6425b3e92421b47df08155
Parents: 6eb1275
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue Oct 14 10:14:12 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Fri Oct 17 11:25:22 2014 +0800

----------------------------------------------------------------------
 .../netty/http/DefaultNettyHttpBinding.java       |  5 +++++
 .../http/handlers/HttpServerChannelHandler.java   | 18 ++----------------
 .../apache/camel/component/netty/NettyHelper.java |  2 +-
 .../camel/component/netty/NettyProducer.java      | 13 ++++++++++---
 .../handlers/ServerResponseFutureListener.java    |  5 +++++
 5 files changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index eed39b0..102fb91 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -34,6 +34,7 @@ import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -411,6 +412,10 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             }
         }
         response.headers().set(HttpHeaders.Names.CONNECTION, connection);
+        // Just make sure we close the channel when the connection value is 
close
+        if (connection.equalsIgnoreCase(HttpHeaders.Values.CLOSE)) {
+            
message.setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
+        }
         LOG.trace("Connection: {}", connection);
 
         return response;

http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index da17d25..cb92d44 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.netty.http.handlers;
 
-import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
 import java.nio.charset.Charset;
@@ -27,7 +26,6 @@ import javax.security.auth.login.LoginException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty.http.HttpPrincipal;
@@ -38,7 +36,6 @@ import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ObjectHelper;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -50,7 +47,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -58,6 +54,7 @@ import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAV
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
 import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
+
 /**
  * Netty HTTP {@link ServerChannelHandler} that handles the incoming HTTP 
requests and routes
  * the received message in Camel.
@@ -303,18 +300,7 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
             }
         }
     }
-
-    @Override
-    protected ChannelFutureListener createResponseFutureListener(NettyConsumer 
consumer, Exchange exchange, SocketAddress remoteAddress) {
-        // make sure to close channel if not keep-alive
-        if (request != null && isKeepAlive(request)) {
-            LOG.trace("Request has Connection: keep-alive so Channel is not 
being closed");
-            return null;
-        } else {
-            LOG.trace("Request is not Connection: close so Channel is being 
closed");
-            return ChannelFutureListener.CLOSE;
-        }
-    }
+    
 
     @Override
     protected Object getResponseBody(Exchange exchange) throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 05f3e4d..b9368fa 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -111,7 +111,7 @@ public final class NettyHelper {
     public static void close(Channel channel) {
         if (channel != null) {
             LOG.trace("Closing channel: {}", channel);
-            channel.close();
+            channel.close().syncUninterruptibly();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 50de736..87ad2be 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -498,8 +498,11 @@ public class NettyProducer extends DefaultAsyncProducer {
         public void done(boolean doneSync) {
             // put back in pool
             try {
-                LOG.trace("Putting channel back to pool {}", channel);
-                pool.returnObject(channel);
+                // Only put the connected channel back to the pool
+                if (channel.isConnected()) {
+                    LOG.trace("Putting channel back to pool {}", channel);
+                    pool.returnObject(channel);
+                }
             } catch (Exception e) {
                 LOG.warn("Error returning channel to pool {}. This exception 
will be ignored.", channel);
             } finally {
@@ -525,7 +528,9 @@ public class NettyProducer extends DefaultAsyncProducer {
         @Override
         public void destroyObject(Channel channel) throws Exception {
             LOG.trace("Destroying channel: {}", channel);
-            NettyHelper.close(channel);
+            if (channel.isOpen()) {
+                NettyHelper.close(channel);
+            }
             allChannels.remove(channel);
         }
 
@@ -540,11 +545,13 @@ public class NettyProducer extends DefaultAsyncProducer {
         @Override
         public void activateObject(Channel channel) throws Exception {
             // noop
+            LOG.trace("activateObject channel: {} -> {}", channel);
         }
 
         @Override
         public void passivateObject(Channel channel) throws Exception {
             // noop
+            LOG.trace("passivateObject channel: {} -> {}", channel);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
index 619a62e..90dc9e6 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
@@ -61,6 +61,11 @@ public class ServerResponseFutureListener implements 
ChannelFutureListener {
         } else {
             close = 
exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
         }
+        
+        // check the setting on the exchange property
+        if (close == null) {
+            close = 
exchange.getProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, 
Boolean.class);
+        }
 
         // should we disconnect, the header can override the configuration
         boolean disconnect = consumer.getConfiguration().isDisconnect();

Reply via email to