Author: davsclaus Date: Sun Apr 8 11:28:44 2012 New Revision: 1310979 URL: http://svn.apache.org/viewvc?rev=1310979&view=rev Log: CAMEL-5150: Some cleanup in camel-netty according to Netty docs.
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java camel/trunk/components/camel-netty/src/test/resources/log4j.properties Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1310979&r1=1310978&r2=1310979&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Sun Apr 8 11:28:44 2012 @@ -68,8 +68,6 @@ public class NettyConfiguration implemen private long sendBufferSize = 65536; private long receiveBufferSize = 65536; private int receiveBufferSizePredictor; - private int corePoolSize = 10; - private int maxPoolSize = 100; private int workerCount; private String keyStoreFormat; private String securityProvider; @@ -385,22 +383,6 @@ public class NettyConfiguration implemen this.trustStoreFile = trustStoreFile; } - public int getCorePoolSize() { - return corePoolSize; - } - - public void setCorePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - } - - public int getMaxPoolSize() { - return maxPoolSize; - } - - public void setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - } - public String getKeyStoreFormat() { return keyStoreFormat; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310979&r1=1310978&r2=1310979&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Sun Apr 8 11:28:44 2012 @@ -46,12 +46,14 @@ public class NettyConsumer extends Defau private ServerBootstrap serverBootstrap; private ConnectionlessBootstrap connectionlessServerBootstrap; private Channel channel; + private ExecutorService bossExecutor; + private ExecutorService workerExecutor; public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) { super(nettyEndpoint, processor); this.context = this.getEndpoint().getCamelContext(); this.configuration = configuration; - this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri()); + this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri()); } @Override @@ -78,14 +80,23 @@ public class NettyConsumer extends Defau LOG.debug("Netty consumer unbinding from: {}", configuration.getAddress()); // close all channels + LOG.trace("Closing {} channels", allChannels.size()); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); - // and then release other resources + // close server external resources if (channelFactory != null) { channelFactory.releaseExternalResources(); } + // and then shutdown the thread pools + if (bossExecutor != null) { + context.getExecutorServiceManager().shutdownNow(bossExecutor); + } + if (workerExecutor != null) { + context.getExecutorServiceManager().shutdownNow(workerExecutor); + } + super.doStop(); LOG.info("Netty consumer unbound from: " + configuration.getAddress()); @@ -144,12 +155,10 @@ public class NettyConsumer extends Defau } private void initializeTCPServerSocketCommunicationLayer() throws Exception { - ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); - if (configuration.getWorkerCount() == 0) { + if (configuration.getWorkerCount() <= 0) { channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); } else { channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, @@ -164,6 +173,7 @@ public class NettyConsumer extends Defau } serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); + serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); @@ -173,10 +183,12 @@ public class NettyConsumer extends Defau } private void initializeUDPServerSocketCommunicationLayer() throws Exception { - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); + if (configuration.getWorkerCount() <= 0) { + datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); + } else { + datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount()); + } connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); if (configuration.getServerPipelineFactory() != null) { configuration.getServerPipelineFactory().setConsumer(this); @@ -186,6 +198,7 @@ public class NettyConsumer extends Defau } connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); + connectionlessServerBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast()); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310979&r1=1310978&r2=1310979&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sun Apr 8 11:28:44 2012 @@ -55,6 +55,8 @@ public class NettyProducer extends Defau private ChannelFactory channelFactory; private DatagramChannelFactory datagramChannelFactory; private CamelLogger noReplyLogger; + private ExecutorService bossExecutor; + private ExecutorService workerExecutor; public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) { super(nettyEndpoint); @@ -103,6 +105,7 @@ public class NettyProducer extends Defau protected void doStop() throws Exception { LOG.debug("Stopping producer at address: {}", configuration.getAddress()); // close all channels + LOG.trace("Closing {} channels", ALL_CHANNELS.size()); ChannelGroupFuture future = ALL_CHANNELS.close(); future.awaitUninterruptibly(); @@ -110,6 +113,15 @@ public class NettyProducer extends Defau if (channelFactory != null) { channelFactory.releaseExternalResources(); } + + // and then shutdown the thread pools + if (bossExecutor != null) { + context.getExecutorServiceManager().shutdownNow(bossExecutor); + } + if (workerExecutor != null) { + context.getExecutorServiceManager().shutdownNow(workerExecutor); + } + super.doStop(); } @@ -208,18 +220,15 @@ public class NettyProducer extends Defau protected void setupTCPCommunication() throws Exception { if (channelFactory == null) { - ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); } } protected void setupUDPCommunication() throws Exception { if (datagramChannelFactory == null) { - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); } } @@ -243,16 +252,17 @@ public class NettyProducer extends Defau if (isTcp()) { ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory); - clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); - clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); - clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); + clientBootstrap.setOption("keepAlive", configuration.isKeepAlive()); + clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay()); + clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); + clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout()); // set the pipeline on the bootstrap clientBootstrap.setPipeline(clientPipeline); answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); return answer; } else { + // TODO: Is this correct for a UDP client ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310979&r1=1310978&r2=1310979&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Sun Apr 8 11:28:44 2012 @@ -42,7 +42,7 @@ public class ClientChannelHandler extend private final Exchange exchange; private final AsyncCallback callback; private boolean messageReceived; - private boolean exceptionHandled; + private volatile boolean exceptionHandled; public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback callback) { this.producer = producer; Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java?rev=1310979&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java Sun Apr 8 11:28:44 2012 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * + */ +public class NettyReuseConnectionTest extends BaseNettyTest { + + private String uri = "netty:tcp://localhost:{{port}}?sync=true&disconnect=false"; + + @Test + public void testReuseConnection() throws Exception { + for (int i = 0; i < 20; i++) { + String out = template.requestBody(uri, "" + i, String.class); + assertEquals("Reply " + i, out); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(uri).transform().simple("Reply ${body}"); + } + }; + } +} Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=1310979&r1=1310978&r2=1310979&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Sun Apr 8 11:28:44 2012 @@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging #log4j.logger.org.apache.camel.component.netty=TRACE #log4j.logger.org.apache.camel=DEBUG -#log4j.logger.org.apache.commons.net=TRACE +#log4j.logger.org.jboss.netty=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender