Author: davsclaus
Date: Sun Apr  8 11:28:44 2012
New Revision: 1310979

URL: http://svn.apache.org/viewvc?rev=1310979&view=rev
Log:
CAMEL-5150: Some cleanup in camel-netty according to Netty docs.

Added:
    
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
Modified:
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/test/resources/log4j.properties

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 Sun Apr  8 11:28:44 2012
@@ -68,8 +68,6 @@ public class NettyConfiguration implemen
     private long sendBufferSize = 65536;
     private long receiveBufferSize = 65536;
     private int receiveBufferSizePredictor;
-    private int corePoolSize = 10;
-    private int maxPoolSize = 100;
     private int workerCount;
     private String keyStoreFormat;
     private String securityProvider;
@@ -385,22 +383,6 @@ public class NettyConfiguration implemen
         this.trustStoreFile = trustStoreFile;
     }
 
-    public int getCorePoolSize() {
-        return corePoolSize;
-    }
-
-    public void setCorePoolSize(int corePoolSize) {
-        this.corePoolSize = corePoolSize;
-    }
-
-    public int getMaxPoolSize() {
-        return maxPoolSize;
-    }
-
-    public void setMaxPoolSize(int maxPoolSize) {
-        this.maxPoolSize = maxPoolSize;
-    }
-
     public String getKeyStoreFormat() {
         return keyStoreFormat;
     }

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
 Sun Apr  8 11:28:44 2012
@@ -46,12 +46,14 @@ public class NettyConsumer extends Defau
     private ServerBootstrap serverBootstrap;
     private ConnectionlessBootstrap connectionlessServerBootstrap;
     private Channel channel;
+    private ExecutorService bossExecutor;
+    private ExecutorService workerExecutor;
 
     public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, 
NettyConfiguration configuration) {
         super(nettyEndpoint, processor);
         this.context = this.getEndpoint().getCamelContext();
         this.configuration = configuration;
-        this.allChannels = new DefaultChannelGroup("NettyProducer-" + 
nettyEndpoint.getEndpointUri());
+        this.allChannels = new DefaultChannelGroup("NettyConsumer-" + 
nettyEndpoint.getEndpointUri());
     }
 
     @Override
@@ -78,14 +80,23 @@ public class NettyConsumer extends Defau
         LOG.debug("Netty consumer unbinding from: {}", 
configuration.getAddress());
 
         // close all channels
+        LOG.trace("Closing {} channels", allChannels.size());
         ChannelGroupFuture future = allChannels.close();
         future.awaitUninterruptibly();
 
-        // and then release other resources
+        // close server external resources
         if (channelFactory != null) {
             channelFactory.releaseExternalResources();
         }
 
+        // and then shutdown the thread pools
+        if (bossExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(bossExecutor);
+        }
+        if (workerExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(workerExecutor);
+        }
+
         super.doStop();
 
         LOG.info("Netty consumer unbound from: " + configuration.getAddress());
@@ -144,12 +155,10 @@ public class NettyConsumer extends Defau
     }
 
     private void initializeTCPServerSocketCommunicationLayer() throws 
Exception {
-        ExecutorService bossExecutor = 
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
-                configuration.getCorePoolSize(), 
configuration.getMaxPoolSize());
-        ExecutorService workerExecutor = 
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
-                configuration.getCorePoolSize(), 
configuration.getMaxPoolSize());
+        bossExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+        workerExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
 
-        if (configuration.getWorkerCount() == 0) {
+        if (configuration.getWorkerCount() <= 0) {
             channelFactory = new NioServerSocketChannelFactory(bossExecutor, 
workerExecutor);
         } else {
             channelFactory = new NioServerSocketChannelFactory(bossExecutor, 
workerExecutor,
@@ -164,6 +173,7 @@ public class NettyConsumer extends Defau
         }
         serverBootstrap.setOption("child.keepAlive", 
configuration.isKeepAlive());
         serverBootstrap.setOption("child.tcpNoDelay", 
configuration.isTcpNoDelay());
+        serverBootstrap.setOption("reuseAddress", 
configuration.isReuseAddress());
         serverBootstrap.setOption("child.reuseAddress", 
configuration.isReuseAddress());
         serverBootstrap.setOption("child.connectTimeoutMillis", 
configuration.getConnectTimeout());
 
@@ -173,10 +183,12 @@ public class NettyConsumer extends Defau
     }
 
     private void initializeUDPServerSocketCommunicationLayer() throws 
Exception {
-        ExecutorService workerExecutor = 
context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
-                configuration.getCorePoolSize(), 
configuration.getMaxPoolSize());
-
-        datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+        workerExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
+        if (configuration.getWorkerCount() <= 0) {
+            datagramChannelFactory = new 
NioDatagramChannelFactory(workerExecutor);
+        } else {
+            datagramChannelFactory = new 
NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
+        }
         connectionlessServerBootstrap = new 
ConnectionlessBootstrap(datagramChannelFactory);
         if (configuration.getServerPipelineFactory() != null) {
             configuration.getServerPipelineFactory().setConsumer(this);
@@ -186,6 +198,7 @@ public class NettyConsumer extends Defau
         }
         connectionlessServerBootstrap.setOption("child.keepAlive", 
configuration.isKeepAlive());
         connectionlessServerBootstrap.setOption("child.tcpNoDelay", 
configuration.isTcpNoDelay());
+        connectionlessServerBootstrap.setOption("reuseAddress", 
configuration.isReuseAddress());
         connectionlessServerBootstrap.setOption("child.reuseAddress", 
configuration.isReuseAddress());
         connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", 
configuration.getConnectTimeout());
         connectionlessServerBootstrap.setOption("child.broadcast", 
configuration.isBroadcast());

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 Sun Apr  8 11:28:44 2012
@@ -55,6 +55,8 @@ public class NettyProducer extends Defau
     private ChannelFactory channelFactory;
     private DatagramChannelFactory datagramChannelFactory;
     private CamelLogger noReplyLogger;
+    private ExecutorService bossExecutor;
+    private ExecutorService workerExecutor;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration 
configuration) {
         super(nettyEndpoint);
@@ -103,6 +105,7 @@ public class NettyProducer extends Defau
     protected void doStop() throws Exception {
         LOG.debug("Stopping producer at address: {}", 
configuration.getAddress());
         // close all channels
+        LOG.trace("Closing {} channels", ALL_CHANNELS.size());
         ChannelGroupFuture future = ALL_CHANNELS.close();
         future.awaitUninterruptibly();
 
@@ -110,6 +113,15 @@ public class NettyProducer extends Defau
         if (channelFactory != null) {
             channelFactory.releaseExternalResources();
         }
+
+        // and then shutdown the thread pools
+        if (bossExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(bossExecutor);
+        }
+        if (workerExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(workerExecutor);
+        }
+
         super.doStop();
     }
 
@@ -208,18 +220,15 @@ public class NettyProducer extends Defau
 
     protected void setupTCPCommunication() throws Exception {
         if (channelFactory == null) {
-            ExecutorService bossExecutor = 
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
-                    configuration.getCorePoolSize(), 
configuration.getMaxPoolSize());
-            ExecutorService workerExecutor = 
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
-                    configuration.getCorePoolSize(), 
configuration.getMaxPoolSize());
+            bossExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+            workerExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
             channelFactory = new NioClientSocketChannelFactory(bossExecutor, 
workerExecutor);
         }
     }
 
     protected void setupUDPCommunication() throws Exception {
         if (datagramChannelFactory == null) {
-            ExecutorService workerExecutor = 
context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
-                    configuration.getCorePoolSize(), 
configuration.getMaxPoolSize());
+            workerExecutor = 
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
             datagramChannelFactory = new 
NioDatagramChannelFactory(workerExecutor);
         }
     }
@@ -243,16 +252,17 @@ public class NettyProducer extends Defau
 
         if (isTcp()) {
             ClientBootstrap clientBootstrap = new 
ClientBootstrap(channelFactory);
-            clientBootstrap.setOption("child.keepAlive", 
configuration.isKeepAlive());
-            clientBootstrap.setOption("child.tcpNoDelay", 
configuration.isTcpNoDelay());
-            clientBootstrap.setOption("child.reuseAddress", 
configuration.isReuseAddress());
-            clientBootstrap.setOption("child.connectTimeoutMillis", 
configuration.getConnectTimeout());
+            clientBootstrap.setOption("keepAlive", 
configuration.isKeepAlive());
+            clientBootstrap.setOption("tcpNoDelay", 
configuration.isTcpNoDelay());
+            clientBootstrap.setOption("reuseAddress", 
configuration.isReuseAddress());
+            clientBootstrap.setOption("connectTimeoutMillis", 
configuration.getConnectTimeout());
 
             // set the pipeline on the bootstrap
             clientBootstrap.setPipeline(clientPipeline);
             answer = clientBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
             return answer;
         } else {
+            // TODO: Is this correct for a UDP client
             ConnectionlessBootstrap connectionlessClientBootstrap = new 
ConnectionlessBootstrap(datagramChannelFactory);
             connectionlessClientBootstrap.setOption("child.keepAlive", 
configuration.isKeepAlive());
             connectionlessClientBootstrap.setOption("child.tcpNoDelay", 
configuration.isTcpNoDelay());

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
 Sun Apr  8 11:28:44 2012
@@ -42,7 +42,7 @@ public class ClientChannelHandler extend
     private final Exchange exchange;
     private final AsyncCallback callback;
     private boolean messageReceived;
-    private boolean exceptionHandled;
+    private volatile boolean exceptionHandled;
 
     public ClientChannelHandler(NettyProducer producer, Exchange exchange, 
AsyncCallback callback) {
         this.producer = producer;

Added: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java?rev=1310979&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
 Sun Apr  8 11:28:44 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class NettyReuseConnectionTest extends BaseNettyTest {
+
+    private String uri = 
"netty:tcp://localhost:{{port}}?sync=true&disconnect=false";
+
+    @Test
+    public void testReuseConnection() throws Exception {
+        for (int i = 0; i < 20; i++) {
+            String out = template.requestBody(uri, "" + i, String.class);
+            assertEquals("Reply " + i, out);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(uri).transform().simple("Reply ${body}");
+            }
+        };
+    }
+}

Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/resources/log4j.properties 
(original)
+++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Sun 
Apr  8 11:28:44 2012
@@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file
 # uncomment the following to enable camel debugging
 #log4j.logger.org.apache.camel.component.netty=TRACE
 #log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.org.apache.commons.net=TRACE
+#log4j.logger.org.jboss.netty=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender


Reply via email to