Updated Branches: refs/heads/master 371efe7e0 -> 4664d64c6
CAMEL-6522: Added bossPool and workerPool options to Netty consumer to allow sharing thread pools more easily. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4664d64c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4664d64c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4664d64c Branch: refs/heads/master Commit: 4664d64c69840637b564b5f65c5ec0f026e46c10 Parents: 371efe7 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jul 9 15:07:12 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jul 9 15:27:42 2013 +0200 ---------------------------------------------------------------------- .../netty/CamelNettyThreadNameDeterminer.java | 36 +++++++ .../component/netty/NettyBossPoolBuilder.java | 67 +++++++++++++ .../camel/component/netty/NettyHelper.java | 1 + .../NettyServerBootstrapConfiguration.java | 39 ++++++++ .../component/netty/NettyWorkerPoolBuilder.java | 80 ++++++++++++++++ .../SingleTCPNettyServerBootstrapFactory.java | 61 ++++++------ .../SingleUDPNettyServerBootstrapFactory.java | 31 +++--- ...UseSharedWorkerThreadPoolManyRoutesTest.java | 84 +++++++++++++++++ .../NettyUseSharedWorkerThreadPoolTest.java | 99 ++++++++++++++++++++ ...pringNettyUseSharedWorkerThreadPoolTest.java | 51 ++++++++++ .../src/test/resources/log4j.properties | 2 +- ...SpringNettyUseSharedWorkerThreadPoolTest.xml | 52 ++++++++++ 12 files changed, 553 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java new file mode 100644 index 0000000..c18ce69 --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.util.concurrent.ThreadHelper; +import org.jboss.netty.util.ThreadNameDeterminer; + +public class CamelNettyThreadNameDeterminer implements ThreadNameDeterminer { + + private final String pattern; + private final String name; + + public CamelNettyThreadNameDeterminer(String pattern, String name) { + this.pattern = pattern; + this.name = name; + } + + @Override + public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception { + return ThreadHelper.resolveThreadName(pattern, name); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java new file mode 100644 index 0000000..13c0876 --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.concurrent.Executors; + +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.NioServerBossPool; + +/** + * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools + * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations. + */ +public final class NettyBossPoolBuilder { + + private String name = "NettyBoss"; + private String pattern; + private int bossCount = 1; + + public void setName(String name) { + this.name = name; + } + + public void setPattern(String pattern) { + this.pattern = pattern; + } + + public void setBossCount(int bossCount) { + this.bossCount = bossCount; + } + + public NettyBossPoolBuilder withName(String name) { + setName(name); + return this; + } + + public NettyBossPoolBuilder withPattern(String pattern) { + setPattern(pattern); + return this; + } + + public NettyBossPoolBuilder withBossCount(int bossCount) { + setBossCount(bossCount); + return this; + } + + /** + * Creates a new boss pool. + */ + BossPool build() { + return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java index 36d325a..05f3e4d 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; */ public final class NettyHelper { + public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; private static final Logger LOG = LoggerFactory.getLogger(NettyHelper.class); private NettyHelper() { http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java index 6ea9d9e..7df49b0 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java @@ -18,8 +18,11 @@ package org.apache.camel.component.netty; import java.io.File; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.camel.util.jsse.SSLContextParameters; +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.WorkerPool; import org.jboss.netty.handler.ssl.SslHandler; public class NettyServerBootstrapConfiguration implements Cloneable { @@ -31,6 +34,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { protected long sendBufferSize = 65536; protected long receiveBufferSize = 65536; protected int receiveBufferSizePredictor; + protected int bossCount = 1; protected int workerCount; protected boolean keepAlive = true; protected boolean tcpNoDelay = true; @@ -52,6 +56,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable { protected String keyStoreFormat; protected String securityProvider; protected String passphrase; + protected BossPool bossPool; + protected WorkerPool workerPool; public String getAddress() { return host + ":" + port; @@ -125,6 +131,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable { this.workerCount = workerCount; } + public int getBossCount() { + return bossCount; + } + + public void setBossCount(int bossCount) { + this.bossCount = bossCount; + } + public boolean isKeepAlive() { return keepAlive; } @@ -281,6 +295,22 @@ public class NettyServerBootstrapConfiguration implements Cloneable { this.options = options; } + public BossPool getBossPool() { + return bossPool; + } + + public void setBossPool(BossPool bossPool) { + this.bossPool = bossPool; + } + + public WorkerPool getWorkerPool() { + return workerPool; + } + + public void setWorkerPool(WorkerPool workerPool) { + this.workerPool = workerPool; + } + /** * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible * with this, as a Netty listener bound on port X shares the same common @@ -305,6 +335,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable { isCompatible = false; } else if (workerCount != other.workerCount) { isCompatible = false; + } else if (bossCount != other.bossCount) { + isCompatible = false; } else if (keepAlive != other.keepAlive) { isCompatible = false; } else if (tcpNoDelay != other.tcpNoDelay) { @@ -352,6 +384,10 @@ public class NettyServerBootstrapConfiguration implements Cloneable { isCompatible = false; } else if (passphrase != null && !passphrase.equals(other.passphrase)) { isCompatible = false; + } else if (bossPool != other.bossPool) { + isCompatible = false; + } else if (workerPool != other.workerPool) { + isCompatible = false; } return isCompatible; @@ -367,6 +403,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { + ", receiveBufferSize=" + receiveBufferSize + ", receiveBufferSizePredictor=" + receiveBufferSizePredictor + ", workerCount=" + workerCount + + ", bossCount=" + bossCount + ", keepAlive=" + keepAlive + ", tcpNoDelay=" + tcpNoDelay + ", reuseAddress=" + reuseAddress @@ -386,6 +423,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable { + ", keyStoreFormat='" + keyStoreFormat + '\'' + ", securityProvider='" + securityProvider + '\'' + ", passphrase='" + passphrase + '\'' + + ", bossPool=" + bossPool + + ", workerPool=" + workerPool + '}'; } } http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java new file mode 100644 index 0000000..2f175d5 --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.concurrent.Executors; + +import org.jboss.netty.channel.socket.nio.NioWorkerPool; +import org.jboss.netty.channel.socket.nio.WorkerPool; + +/** + * A builder to create Netty {@link WorkerPool} which can be used for sharing worker pools + * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations. + */ +public final class NettyWorkerPoolBuilder { + + private String name = "NettyWorker"; + private String pattern; + private int workerCount; + private volatile WorkerPool workerPool; + + public void setName(String name) { + this.name = name; + } + + public void setPattern(String pattern) { + this.pattern = pattern; + } + + public void setWorkerCount(int workerCount) { + this.workerCount = workerCount; + } + + public NettyWorkerPoolBuilder withName(String name) { + setName(name); + return this; + } + + public NettyWorkerPoolBuilder withPattern(String pattern) { + setPattern(pattern); + return this; + } + + public NettyWorkerPoolBuilder withWorkerCount(int workerCount) { + setWorkerCount(workerCount); + return this; + } + + /** + * Creates a new worker pool. + */ + WorkerPool build() { + int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS; + workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), count, new CamelNettyThreadNameDeterminer(pattern, name)); + return workerPool; + } + + /** + * Shutdown the created worker pool + */ + public void destroy() { + if (workerPool != null) { + workerPool.shutdown(); + workerPool = null; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java index 23e90b0..3533116 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java @@ -18,8 +18,6 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.camel.CamelContext; @@ -31,7 +29,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.BossPool; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.WorkerPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +49,8 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme private ChannelFactory channelFactory; private ServerBootstrap serverBootstrap; private Channel channel; - private ExecutorService bossExecutor; - private ExecutorService workerExecutor; + private BossPool bossPool; + private WorkerPool workerPool; public SingleTCPNettyServerBootstrapFactory() { this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName()); @@ -98,21 +98,29 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme } protected void startServerBootstrap() { - if (camelContext != null) { - bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); - workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); - } else { - bossExecutor = Executors.newCachedThreadPool(threadFactory); - workerExecutor = Executors.newCachedThreadPool(threadFactory); + // prefer using explicit configured thread pools + BossPool bp = configuration.getBossPool(); + WorkerPool wp = configuration.getWorkerPool(); + + if (bp == null) { + // create new pool which we should shutdown when stopping as its not shared + bossPool = new NettyBossPoolBuilder() + .withBossCount(configuration.getBossCount()) + .withName("NettyTCPBoss") + .build(); + bp = bossPool; } - - if (configuration.getWorkerCount() <= 0) { - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); - } else { - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, - configuration.getWorkerCount()); + if (wp == null) { + // create new pool which we should shutdown when stopping as its not shared + workerPool = new NettyWorkerPoolBuilder() + .withWorkerCount(configuration.getWorkerCount()) + .withName("NettyTCPWorker") + .build(); + wp = workerPool; } + channelFactory = new NioServerSocketChannelFactory(bp, wp); + serverBootstrap = new ServerBootstrap(channelFactory); serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); @@ -156,21 +164,14 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme } // and then shutdown the thread pools - if (bossExecutor != null) { - if (camelContext != null) { - camelContext.getExecutorServiceManager().shutdown(bossExecutor); - } else { - bossExecutor.shutdownNow(); - } - bossExecutor = null; + if (bossPool != null) { + bossPool.shutdown(); + bossPool = null; } - if (workerExecutor != null) { - if (camelContext != null) { - camelContext.getExecutorServiceManager().shutdown(workerExecutor); - } else { - workerExecutor.shutdownNow(); - } - workerExecutor = null; + if (workerPool != null) { + workerPool.shutdown(); + workerPool = null; } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index 53bc792..ae9aaef 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -18,7 +18,6 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -33,6 +32,8 @@ import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool; +import org.jboss.netty.channel.socket.nio.WorkerPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); + private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; private final ChannelGroup allChannels; private CamelContext camelContext; private ThreadFactory threadFactory; @@ -50,7 +52,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme private DatagramChannelFactory datagramChannelFactory; private ConnectionlessBootstrap connectionlessBootstrap; private Channel channel; - private ExecutorService workerExecutor; + private WorkerPool workerPool; public SingleUDPNettyServerBootstrapFactory() { this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName()); @@ -98,17 +100,11 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme } protected void startServerBootstrap() { - if (camelContext != null) { - workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); - } else { - workerExecutor = Executors.newCachedThreadPool(threadFactory); - } + // create non-shared worker pool + int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : DEFAULT_IO_THREADS; + workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count); - if (configuration.getWorkerCount() <= 0) { - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); - } else { - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount()); - } + datagramChannelFactory = new NioDatagramChannelFactory(workerPool); connectionlessBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); connectionlessBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); @@ -161,13 +157,10 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme } // and then shutdown the thread pools - if (workerExecutor != null) { - if (camelContext != null) { - camelContext.getExecutorServiceManager().shutdown(workerExecutor); - } else { - workerExecutor.shutdownNow(); - } - workerExecutor = null; + if (workerPool != null) { + workerPool.shutdown(); + workerPool = null; } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java new file mode 100644 index 0000000..14e1c78 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.junit.Test; + +/** + * @version + */ +public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest { + + private JndiRegistry jndi; + private BossPool sharedBoos; + private WorkerPool sharedWorker; + private int before; + + @Override + protected boolean useJmx() { + return true; + } + + @Override + public void setUp() throws Exception { + before = Thread.activeCount(); + super.setUp(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + jndi = super.createRegistry(); + return jndi; + } + + @Test + public void testSharedThreadPool() throws Exception { + int delta = Thread.activeCount() - before; + + log.info("Created threads {}", delta); + assertTrue("There should not be created so many threads: " + delta, delta < 50); + + sharedWorker.shutdown(); + sharedBoos.shutdown(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + sharedWorker = new NettyWorkerPoolBuilder().withWorkerCount(10).build(); + jndi.bind("sharedWorker", sharedWorker); + sharedBoos = new NettyBossPoolBuilder().withBossCount(20).build(); + jndi.bind("sharedBoss", sharedBoos); + + for (int i = 0; i < 100; i++) { + from("netty:tcp://localhost:" + getNextPort() + "?textline=true&sync=true&orderedThreadPoolExecutor=false" + + "&bossPool=#sharedBoss&workerPool=#sharedWorker") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Bye")); + } + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java new file mode 100644 index 0000000..f32de27 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.junit.Test; + +/** + * @version + */ +public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { + + private JndiRegistry jndi; + private WorkerPool shared; + private int port; + private int port2; + private int port3; + + @Override + protected boolean useJmx() { + return true; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + jndi = super.createRegistry(); + return jndi; + } + + @Test + public void testSharedThreadPool() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(30); + + for (int i = 0; i < 10; i++) { + String reply = template.requestBody("netty:tcp://localhost:" + port + "?textline=true&sync=true", "Hello World", String.class); + assertEquals("Bye World", reply); + + reply = template.requestBody("netty:tcp://localhost:" + port2 + "?textline=true&sync=true", "Hello Camel", String.class); + assertEquals("Hi Camel", reply); + + reply = template.requestBody("netty:tcp://localhost:" + port3 + "?textline=true&sync=true", "Hello Claus", String.class); + assertEquals("Hej Claus", reply); + } + + assertMockEndpointsSatisfied(); + + shared.shutdown(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // we have 3 routes, but lets try to have only 2 threads in the pool + shared = new NettyWorkerPoolBuilder().withWorkerCount(2).build(); + jndi.bind("sharedPool", shared); + + port = getPort(); + port2 = getNextPort(); + port3 = getNextPort(); + + from("netty:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Bye")); + + from("netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Hi")); + + from("netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Hej")); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java new file mode 100644 index 0000000..0cc1bd6 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +/** + * @version + */ +public class SpringNettyUseSharedWorkerThreadPoolTest extends CamelSpringTestSupport { + + @Test + public void testSharedThreadPool() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(30); + + for (int i = 0; i < 10; i++) { + String reply = template.requestBody("netty:tcp://localhost:5021?textline=true&sync=true", "Hello World", String.class); + assertEquals("Hello World", reply); + + reply = template.requestBody("netty:tcp://localhost:5022?textline=true&sync=true", "Hello Camel", String.class); + assertEquals("Hello Camel", reply); + + reply = template.requestBody("netty:tcp://localhost:5023?textline=true&sync=true", "Hello Claus", String.class); + assertEquals("Hello Claus", reply); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/resources/log4j.properties b/components/camel-netty/src/test/resources/log4j.properties index 1fa01e4..35d1b54 100644 --- a/components/camel-netty/src/test/resources/log4j.properties +++ b/components/camel-netty/src/test/resources/log4j.properties @@ -29,7 +29,7 @@ log4j.rootLogger=INFO, file log4j.appender.out=org.apache.log4j.ConsoleAppender log4j.appender.out.layout=org.apache.log4j.PatternLayout #log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n -log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.out.layout.ConversionPattern=%d [%-35.35t] %-5p %-30.30c{1} - %m%n # File appender log4j.appender.file=org.apache.log4j.FileAppender http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml b/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml new file mode 100644 index 0000000..7ea4d2d --- /dev/null +++ b/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <!-- use the worker pool builder to create to help create the shared thread pool --> + <bean id="poolBuilder" class="org.apache.camel.component.netty.NettyWorkerPoolBuilder"> + <property name="workerCount" value="2"/> + </bean> + + <!-- the shared worker thread pool --> + <bean id="sharedPool" class="org.jboss.netty.channel.socket.nio.WorkerPool" + factory-bean="poolBuilder" factory-method="build" destroy-method="shutdown"> + </bean> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="netty:tcp://localhost:5021?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/> + <to uri="log:result"/> + <to uri="mock:result"/> + </route> + + <route> + <from uri="netty:tcp://localhost:5022?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/> + <to uri="log:result"/> + <to uri="mock:result"/> + </route> + + <route> + <from uri="netty:tcp://localhost:5023?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/> + <to uri="log:result"/> + <to uri="mock:result"/> + </route> + </camelContext> + +</beans>