Updated Branches: refs/heads/master ee6ec2c35 -> 39ce62568
CAMEL-6522: Added bossPool and workerPool options to Netty prodicer to allow sharing thread pools more easily. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39ce6256 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39ce6256 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39ce6256 Branch: refs/heads/master Commit: 39ce6256870006d0490bac6c7fff1ef2abbd0470 Parents: ee6ec2c Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jul 9 15:58:23 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jul 9 16:08:44 2013 +0200 ---------------------------------------------------------------------- .../component/netty/NettyBossPoolBuilder.java | 67 ------------------- .../netty/NettyClientBossPoolBuilder.java | 68 ++++++++++++++++++++ .../camel/component/netty/NettyProducer.java | 56 ++++++++++------ .../netty/NettyServerBossPoolBuilder.java | 67 +++++++++++++++++++ .../SingleTCPNettyServerBootstrapFactory.java | 6 +- .../SingleUDPNettyServerBootstrapFactory.java | 3 +- ...UseSharedWorkerThreadPoolManyRoutesTest.java | 2 +- .../NettyUseSharedWorkerThreadPoolTest.java | 24 ++++--- 8 files changed, 189 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java deleted file mode 100644 index 13c0876..0000000 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.netty; - -import java.util.concurrent.Executors; - -import org.jboss.netty.channel.socket.nio.BossPool; -import org.jboss.netty.channel.socket.nio.NioServerBossPool; - -/** - * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools - * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations. - */ -public final class NettyBossPoolBuilder { - - private String name = "NettyBoss"; - private String pattern; - private int bossCount = 1; - - public void setName(String name) { - this.name = name; - } - - public void setPattern(String pattern) { - this.pattern = pattern; - } - - public void setBossCount(int bossCount) { - this.bossCount = bossCount; - } - - public NettyBossPoolBuilder withName(String name) { - setName(name); - return this; - } - - public NettyBossPoolBuilder withPattern(String pattern) { - setPattern(pattern); - return this; - } - - public NettyBossPoolBuilder withBossCount(int bossCount) { - setBossCount(bossCount); - return this; - } - - /** - * Creates a new boss pool. - */ - BossPool build() { - return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name)); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyClientBossPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyClientBossPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyClientBossPoolBuilder.java new file mode 100644 index 0000000..76bd3f9 --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyClientBossPoolBuilder.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.concurrent.Executors; + +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.NioClientBossPool; +import org.jboss.netty.util.HashedWheelTimer; + +/** + * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools + * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations. + */ +public final class NettyClientBossPoolBuilder { + + private String name = "NettyClientBoss"; + private String pattern; + private int bossCount = 1; + + public void setName(String name) { + this.name = name; + } + + public void setPattern(String pattern) { + this.pattern = pattern; + } + + public void setBossCount(int bossCount) { + this.bossCount = bossCount; + } + + public NettyClientBossPoolBuilder withName(String name) { + setName(name); + return this; + } + + public NettyClientBossPoolBuilder withPattern(String pattern) { + setPattern(pattern); + return this; + } + + public NettyClientBossPoolBuilder withBossCount(int bossCount) { + setBossCount(bossCount); + return this; + } + + /** + * Creates a new boss pool. + */ + BossPool build() { + return new NioClientBossPool(Executors.newCachedThreadPool(), bossCount, new HashedWheelTimer(), new CamelNettyThreadNameDeterminer(pattern, name)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 77cee1c..f36e6f7 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -19,7 +19,7 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -45,8 +45,11 @@ import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.DatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.BossPool; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool; +import org.jboss.netty.channel.socket.nio.WorkerPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +62,8 @@ public class NettyProducer extends DefaultAsyncProducer { private DatagramChannelFactory datagramChannelFactory; private ClientPipelineFactory pipelineFactory; private CamelLogger noReplyLogger; - private ExecutorService bossExecutor; - private ExecutorService workerExecutor; + private BossPool bossPool; + private WorkerPool workerPool; private ObjectPool<Channel> pool; public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) { @@ -154,13 +157,13 @@ public class NettyProducer extends DefaultAsyncProducer { } // and then shutdown the thread pools - if (bossExecutor != null) { - context.getExecutorServiceManager().shutdown(bossExecutor); - bossExecutor = null; + if (bossPool != null) { + bossPool.shutdown(); + bossPool = null; } - if (workerExecutor != null) { - context.getExecutorServiceManager().shutdown(workerExecutor); - workerExecutor = null; + if (workerPool != null) { + workerPool.shutdown(); + workerPool = null; } if (pool != null) { @@ -316,24 +319,35 @@ public class NettyProducer extends DefaultAsyncProducer { protected void setupTCPCommunication() throws Exception { if (channelFactory == null) { - bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); - workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); - if (configuration.getWorkerCount() <= 0) { - channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); - } else { - channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor, configuration.getWorkerCount()); + // prefer using explicit configured thread pools + BossPool bp = configuration.getBossPool(); + WorkerPool wp = configuration.getWorkerPool(); + + if (bp == null) { + // create new pool which we should shutdown when stopping as its not shared + bossPool = new NettyClientBossPoolBuilder() + .withBossCount(configuration.getBossCount()) + .withName("NettyClientTCPBoss") + .build(); + bp = bossPool; } + if (wp == null) { + // create new pool which we should shutdown when stopping as its not shared + workerPool = new NettyWorkerPoolBuilder() + .withWorkerCount(configuration.getWorkerCount()) + .withName("NettyClientTCPWorker") + .build(); + wp = workerPool; + } + channelFactory = new NioClientSocketChannelFactory(bp, wp); } } protected void setupUDPCommunication() throws Exception { if (datagramChannelFactory == null) { - workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); - if (configuration.getWorkerCount() <= 0) { - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); - } else { - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount()); - } + int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS; + workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count); + datagramChannelFactory = new NioDatagramChannelFactory(workerPool); } } http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBossPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBossPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBossPoolBuilder.java new file mode 100644 index 0000000..edb360f --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBossPoolBuilder.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.concurrent.Executors; + +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.NioServerBossPool; + +/** + * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools + * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations. + */ +public final class NettyServerBossPoolBuilder { + + private String name = "NettyServerBoss"; + private String pattern; + private int bossCount = 1; + + public void setName(String name) { + this.name = name; + } + + public void setPattern(String pattern) { + this.pattern = pattern; + } + + public void setBossCount(int bossCount) { + this.bossCount = bossCount; + } + + public NettyServerBossPoolBuilder withName(String name) { + setName(name); + return this; + } + + public NettyServerBossPoolBuilder withPattern(String pattern) { + setPattern(pattern); + return this; + } + + public NettyServerBossPoolBuilder withBossCount(int bossCount) { + setBossCount(bossCount); + return this; + } + + /** + * Creates a new boss pool. + */ + BossPool build() { + return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java index 3533116..9e06622 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java @@ -104,9 +104,9 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme if (bp == null) { // create new pool which we should shutdown when stopping as its not shared - bossPool = new NettyBossPoolBuilder() + bossPool = new NettyServerBossPoolBuilder() .withBossCount(configuration.getBossCount()) - .withName("NettyTCPBoss") + .withName("NettyServerTCPBoss") .build(); bp = bossPool; } @@ -114,7 +114,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme // create new pool which we should shutdown when stopping as its not shared workerPool = new NettyWorkerPoolBuilder() .withWorkerCount(configuration.getWorkerCount()) - .withName("NettyTCPWorker") + .withName("NettyServerTCPWorker") .build(); wp = workerPool; } http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index ae9aaef..5c59166 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); - private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; private final ChannelGroup allChannels; private CamelContext camelContext; private ThreadFactory threadFactory; @@ -101,7 +100,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme protected void startServerBootstrap() { // create non-shared worker pool - int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : DEFAULT_IO_THREADS; + int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS; workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count); datagramChannelFactory = new NioDatagramChannelFactory(workerPool); http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java index 14e1c78..cf45e69 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java @@ -67,7 +67,7 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest public void configure() throws Exception { sharedWorker = new NettyWorkerPoolBuilder().withWorkerCount(10).build(); jndi.bind("sharedWorker", sharedWorker); - sharedBoos = new NettyBossPoolBuilder().withBossCount(20).build(); + sharedBoos = new NettyServerBossPoolBuilder().withBossCount(20).build(); jndi.bind("sharedBoss", sharedBoos); for (int i = 0; i < 100; i++) { http://git-wip-us.apache.org/repos/asf/camel/blob/39ce6256/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java index f32de27..fd28f25 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java @@ -27,7 +27,8 @@ import org.junit.Test; public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { private JndiRegistry jndi; - private WorkerPool shared; + private WorkerPool sharedServer; + private WorkerPool sharedClient; private int port; private int port2; private int port3; @@ -48,19 +49,20 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { getMockEndpoint("mock:result").expectedMessageCount(30); for (int i = 0; i < 10; i++) { - String reply = template.requestBody("netty:tcp://localhost:" + port + "?textline=true&sync=true", "Hello World", String.class); + String reply = template.requestBody("netty:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedClientPool", "Hello World", String.class); assertEquals("Bye World", reply); - reply = template.requestBody("netty:tcp://localhost:" + port2 + "?textline=true&sync=true", "Hello Camel", String.class); + reply = template.requestBody("netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedClientPool", "Hello Camel", String.class); assertEquals("Hi Camel", reply); - reply = template.requestBody("netty:tcp://localhost:" + port3 + "?textline=true&sync=true", "Hello Claus", String.class); + reply = template.requestBody("netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedClientPool", "Hello Claus", String.class); assertEquals("Hej Claus", reply); } assertMockEndpointsSatisfied(); - shared.shutdown(); + sharedServer.shutdown(); + sharedClient.shutdown(); } @Override @@ -69,26 +71,28 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { @Override public void configure() throws Exception { // we have 3 routes, but lets try to have only 2 threads in the pool - shared = new NettyWorkerPoolBuilder().withWorkerCount(2).build(); - jndi.bind("sharedPool", shared); + sharedServer = new NettyWorkerPoolBuilder().withWorkerCount(2).withName("NettyServer").build(); + jndi.bind("sharedServerPool", sharedServer); + sharedClient = new NettyWorkerPoolBuilder().withWorkerCount(3).withName("NettyClient").build(); + jndi.bind("sharedClientPool", sharedClient); port = getPort(); port2 = getNextPort(); port3 = getNextPort(); - from("netty:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false") + from("netty:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedServerPool&orderedThreadPoolExecutor=false") .validate(body().isInstanceOf(String.class)) .to("log:result") .to("mock:result") .transform(body().regexReplaceAll("Hello", "Bye")); - from("netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false") + from("netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedServerPool&orderedThreadPoolExecutor=false") .validate(body().isInstanceOf(String.class)) .to("log:result") .to("mock:result") .transform(body().regexReplaceAll("Hello", "Hi")); - from("netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false") + from("netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedServerPool&orderedThreadPoolExecutor=false") .validate(body().isInstanceOf(String.class)) .to("log:result") .to("mock:result")