Author: davsclaus
Date: Sun Apr  8 12:54:55 2012
New Revision: 1310993

URL: http://svn.apache.org/viewvc?rev=1310993&view=rev
Log:
CAMEL-4556: Netty producer now reuses connection.

Added:
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
      - copied unchanged from r1310989, 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
    
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1310989

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
 Sun Apr  8 12:54:55 2012
@@ -16,24 +16,18 @@
  */
 package org.apache.camel.component.netty;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 
 public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
     protected NettyProducer producer;
-    protected Exchange exchange;
-    protected AsyncCallback callback;
 
     public ClientPipelineFactory() {
     }
-    
-    public ClientPipelineFactory(NettyProducer producer, Exchange exchange, 
AsyncCallback callback) {
+
+    public ClientPipelineFactory(NettyProducer producer) {
         this.producer = producer;
-        this.exchange = exchange;
-        this.callback = callback;
     }
     
     public ChannelPipeline getPipeline() throws Exception {
@@ -48,21 +42,4 @@ public abstract class ClientPipelineFact
     public void setProducer(NettyProducer producer) {
         this.producer = producer;
     }
-
-    public Exchange getExchange() {
-        return exchange;
-    }
-
-    public void setExchange(Exchange exchange) {
-        this.exchange = exchange;
-    }
-
-    public AsyncCallback getCallback() {
-        return callback;
-    }
-
-    public void setCallback(AsyncCallback callback) {
-        this.callback = callback;
-    }
-
 }

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
 Sun Apr  8 12:54:55 2012
@@ -17,12 +17,9 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
-
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
@@ -34,10 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DefaultClientPipelineFactory extends ClientPipelineFactory {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(ClientPipelineFactory.class);
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
 
-    public DefaultClientPipelineFactory(NettyProducer producer, Exchange 
exchange, AsyncCallback callback) {
-        super(producer, exchange, callback);
+    public DefaultClientPipelineFactory(NettyProducer producer) {
+        super(producer);
     }
 
     public ChannelPipeline getPipeline() throws Exception {
@@ -61,7 +58,7 @@ public class DefaultClientPipelineFactor
         }
 
         // our handler must be added last
-        channelPipeline.addLast("handler", new ClientChannelHandler(producer, 
exchange, callback));
+        channelPipeline.addLast("handler", new ClientChannelHandler(producer));
 
         return channelPipeline;
     }

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 Sun Apr  8 12:54:55 2012
@@ -27,7 +27,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -37,6 +36,7 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelLocal;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -57,6 +57,9 @@ public class NettyProducer extends Defau
     private CamelLogger noReplyLogger;
     private ExecutorService bossExecutor;
     private ExecutorService workerExecutor;
+    private final ChannelLocal<NettyCamelState> state = new 
ChannelLocal<NettyCamelState>();
+    private ChannelFuture channelFuture;
+    private Channel channel;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration 
configuration) {
         super(nettyEndpoint);
@@ -157,11 +160,16 @@ public class NettyProducer extends Defau
             exchange.setProperty(Exchange.CHARSET_NAME, 
IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
         }
 
-        ChannelFuture channelFuture;
-        final Channel channel;
         try {
-            channelFuture = openConnection(exchange, callback);
-            channel = openChannel(channelFuture);
+            // allow to reuse channel, on this producer, to avoid creating a 
new connection
+            // for each message being sent
+            if (channelFuture == null || channel == null || !channel.isOpen()) 
{
+                channelFuture = openConnection();
+                channel = openChannel(channelFuture);
+            }
+            // setup state now we have the channel we can do this because
+            // this producer is not thread safe, but pooled using 
ServicePoolAware
+            state.set(channel, new NettyCamelState(callback, exchange));
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
@@ -218,6 +226,21 @@ public class NettyProducer extends Defau
         return false;
     }
 
+    /**
+     * To get the {@link NettyCamelState} from this producer.
+     */
+    public NettyCamelState getState(Channel channel) {
+        return state.get(channel);
+    }
+
+    /**
+     * To remove the {@link NettyCamelState} stored on this producer,
+     * when no longer needed
+     */
+    public void removeState(Channel channel) {
+        state.remove(channel);
+    }
+
     protected void setupTCPCommunication() throws Exception {
         if (channelFactory == null) {
             bossExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
@@ -233,19 +256,17 @@ public class NettyProducer extends Defau
         }
     }
 
-    private ChannelFuture openConnection(Exchange exchange, AsyncCallback 
callback) throws Exception {
+    private ChannelFuture openConnection() throws Exception {
         ChannelFuture answer;
         ChannelPipeline clientPipeline;
 
         if (configuration.getClientPipelineFactory() != null) {
             // initialize user defined client pipeline factory
             configuration.getClientPipelineFactory().setProducer(this);
-            configuration.getClientPipelineFactory().setExchange(exchange);
-            configuration.getClientPipelineFactory().setCallback(callback);
             clientPipeline = 
configuration.getClientPipelineFactory().getPipeline();
         } else {
             // initialize client pipeline factory
-            ClientPipelineFactory clientPipelineFactory = new 
DefaultClientPipelineFactory(this, exchange, callback);
+            ClientPipelineFactory clientPipelineFactory = new 
DefaultClientPipelineFactory(this);
             // must get the pipeline from the factory when opening a new 
connection
             clientPipeline = clientPipelineFactory.getPipeline();
         }
@@ -295,13 +316,10 @@ public class NettyProducer extends Defau
     }
 
     private void openAndCloseConnection() throws Exception {
-        ChannelFuture future = openConnection(new DefaultExchange(context), 
new AsyncCallback() {
-            public void done(boolean doneSync) {
-                // noop
-            }
-        });
+        ChannelFuture future = openConnection();
         Channel channel = openChannel(future);
         NettyHelper.close(channel);
+        ALL_CHANNELS.remove(channel);
     }
 
     public NettyConfiguration getConfiguration() {

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
 Sun Apr  8 12:54:55 2012
@@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.component.netty.NettyCamelState;
 import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.NettyPayloadHelper;
@@ -39,15 +40,11 @@ import org.slf4j.LoggerFactory;
 public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(ClientChannelHandler.class);
     private final NettyProducer producer;
-    private final Exchange exchange;
-    private final AsyncCallback callback;
     private boolean messageReceived;
     private volatile boolean exceptionHandled;
 
-    public ClientChannelHandler(NettyProducer producer, Exchange exchange, 
AsyncCallback callback) {
+    public ClientChannelHandler(NettyProducer producer) {
         this.producer = producer;
-        this.exchange = exchange;
-        this.callback = callback;
     }
 
     @Override
@@ -73,20 +70,33 @@ public class ClientChannelHandler extend
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel as an exception was thrown from Netty", 
cause);
         }
-        // set the cause on the exchange
-        exchange.setException(cause);
 
-        // close channel in case an exception was thrown
-        NettyHelper.close(exceptionEvent.getChannel());
+        Exchange exchange = getExchange(ctx);
+        AsyncCallback callback = getAsyncCallback(ctx);
 
-        // signal callback
-        callback.done(false);
+        // the state may not be set
+        if (exchange != null && callback != null) {
+            // set the cause on the exchange
+            exchange.setException(cause);
+
+            // close channel in case an exception was thrown
+            NettyHelper.close(exceptionEvent.getChannel());
+
+            // signal callback
+            callback.done(false);
+        }
     }
 
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) 
throws Exception {
         LOG.trace("Channel closed: {}", ctx.getChannel());
 
+        Exchange exchange = getExchange(ctx);
+        AsyncCallback callback = getAsyncCallback(ctx);
+
+        // remove state
+        producer.removeState(ctx.getChannel());
+
         if (producer.getConfiguration().isSync() && !messageReceived && 
!exceptionHandled) {
             // session was closed but no message received. This could be 
because the remote server had an internal error
             // and could not return a response. We should count down to stop 
waiting for a response
@@ -103,6 +113,9 @@ public class ClientChannelHandler extend
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
messageEvent) throws Exception {
         messageReceived = true;
 
+        Exchange exchange = getExchange(ctx);
+        AsyncCallback callback = getAsyncCallback(ctx);
+
         Object body = messageEvent.getMessage();
         LOG.debug("Message received: {}", body);
 
@@ -150,4 +163,14 @@ public class ClientChannelHandler extend
         }
     }
 
+    private Exchange getExchange(ChannelHandlerContext ctx) {
+        NettyCamelState state = producer.getState(ctx.getChannel());
+        return state != null ? state.getExchange() : null;
+    }
+
+    private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
+        NettyCamelState state = producer.getState(ctx.getChannel());
+        return state != null ? state.getCallback() : null;
+    }
+
 }

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
 Sun Apr  8 12:54:55 2012
@@ -25,7 +25,6 @@ import org.apache.camel.component.netty.
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
-import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -35,9 +34,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Server handler which is shared
+ * Client handler which cannot be shared
  */
-@ChannelHandler.Sharable
 public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(ServerChannelHandler.class);
     private NettyConsumer consumer;

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
 Sun Apr  8 12:54:55 2012
@@ -97,10 +97,9 @@ public class NettyCustomPipelineFactoryA
             channelPipeline.addLast("decoder-DELIM", new 
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
             channelPipeline.addLast("decoder-SD", new 
StringDecoder(CharsetUtil.UTF_8));
             channelPipeline.addLast("encoder-SD", new 
StringEncoder(CharsetUtil.UTF_8));            
-            channelPipeline.addLast("handler", new 
ClientChannelHandler(producer, exchange, callback));
+            channelPipeline.addLast("handler", new 
ClientChannelHandler(producer));
 
             return channelPipeline;
-
         }
         
         public boolean isfactoryInvoked() {

Modified: 
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1310993&r1=1310992&r2=1310993&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
 Sun Apr  8 12:54:55 2012
@@ -97,7 +97,7 @@ public class NettyCustomPipelineFactoryS
             channelPipeline.addLast("decoder-DELIM", new 
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
             channelPipeline.addLast("decoder-SD", new 
StringDecoder(CharsetUtil.UTF_8));
             channelPipeline.addLast("encoder-SD", new 
StringEncoder(CharsetUtil.UTF_8));            
-            channelPipeline.addLast("handler", new 
ClientChannelHandler(producer, exchange, callback));
+            channelPipeline.addLast("handler", new 
ClientChannelHandler(producer));
 
             return channelPipeline;
         }


Reply via email to