Updated Branches: refs/heads/master b60b6cc4f -> a70a94fd1
CAMEL-6488: camel-netty-http allow to share port in OSGi environment. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a70a94fd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a70a94fd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a70a94fd Branch: refs/heads/master Commit: a70a94fd194a824325831be184ae96e07b264e74 Parents: b60b6cc Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jun 26 13:18:35 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jun 26 13:25:16 2013 +0200 ---------------------------------------------------------------------- .../http/DefaultNettySharedHttpServer.java | 19 ++++++++++-- .../netty/http/NettySharedHttpServer.java | 5 +++ .../netty/http/NettySharedHttpServerTest.java | 2 ++ .../netty/NettyServerBootstrapFactory.java | 15 +++++++-- .../SingleTCPNettyServerBootstrapFactory.java | 32 ++++++++++++-------- .../SingleUDPNettyServerBootstrapFactory.java | 14 +++++++-- examples/camel-example-netty-http/README.txt | 2 +- 7 files changed, 69 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java index c264128..e3993e1 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java @@ -16,12 +16,16 @@ */ package org.apache.camel.component.netty.http; +import java.util.concurrent.ThreadFactory; +import java.util.regex.Matcher; + import org.apache.camel.component.netty.NettyServerBootstrapFactory; import org.apache.camel.component.netty.http.handlers.HttpServerMultiplexChannelHandler; import org.apache.camel.spi.ClassResolver; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.concurrent.CamelThreadFactory; import org.jboss.netty.channel.ChannelPipelineFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +36,8 @@ import org.slf4j.LoggerFactory; public class DefaultNettySharedHttpServer extends ServiceSupport implements NettySharedHttpServer { // TODO: option to enlist in JMX - // TODO: option to configure thread name pattern for the shared jetty threads + public static final String DEFAULT_PATTERN = "Camel Thread ##counter# - #name#:#port#"; private static final Logger LOG = LoggerFactory.getLogger(DefaultNettySharedHttpServer.class); private NettySharedHttpServerBootstrapConfiguration configuration; @@ -41,6 +45,7 @@ public class DefaultNettySharedHttpServer extends ServiceSupport implements Nett private HttpServerBootstrapFactory bootstrapFactory; private ClassResolver classResolver; private boolean startServer = true; + private String threadPattern = DEFAULT_PATTERN; public void setNettyServerBootstrapConfiguration(NettySharedHttpServerBootstrapConfiguration configuration) { this.configuration = configuration; @@ -74,6 +79,10 @@ public class DefaultNettySharedHttpServer extends ServiceSupport implements Nett this.startServer = startServer; } + public void setThreadNamePattern(String pattern) { + this.threadPattern = pattern; + } + protected void doStart() throws Exception { ObjectHelper.notNull(configuration, "setNettyServerBootstrapConfiguration() must be called with a NettyServerBootstrapConfiguration instance", this); @@ -96,9 +105,15 @@ public class DefaultNettySharedHttpServer extends ServiceSupport implements Nett ChannelPipelineFactory pipelineFactory = new HttpServerSharedPipelineFactory(configuration, channelFactory, classResolver); + // thread factory and pattern + String port = Matcher.quoteReplacement("" + configuration.getPort()); + String pattern = threadPattern; + pattern = pattern.replaceFirst("#port#", port); + ThreadFactory tf = new CamelThreadFactory(pattern, "NettySharedHttpServer", true); + // create bootstrap factory and disable compatible check as its shared among the consumers bootstrapFactory = new HttpServerBootstrapFactory(channelFactory, false); - bootstrapFactory.init(null, configuration, pipelineFactory); + bootstrapFactory.init(tf, configuration, pipelineFactory); ServiceHelper.startServices(channelFactory); http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java index 22110bb..b059d13 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java @@ -47,6 +47,11 @@ public interface NettySharedHttpServer extends Service { void setStartServer(boolean startServer); /** + * Sets a custom thread name pattern to be used for naming the Netty HTTP server threads. + */ + void setThreadNamePattern(String pattern); + + /** * Gets the port number this Netty HTTP server uses. */ int getPort(); http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java index a268ba9..bf87b62 100644 --- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java @@ -72,11 +72,13 @@ public class NettySharedHttpServerTest extends BaseNettyTest { public void configure() throws Exception { // we are using a shared netty http server, so the port number is not needed to be defined in the uri from("netty-http:http://localhost/foo?nettySharedHttpServer=#myNettyServer") + .log("Foo route using thread ${threadName}") .to("mock:foo") .transform().constant("Bye World"); // we are using a shared netty http server, so the port number is not needed to be defined in the uri from("netty-http:http://localhost/bar?nettySharedHttpServer=#myNettyServer") + .log("Bar route using thread ${threadName}") .to("mock:bar") .transform().constant("Bye Camel"); } http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java index a27ccfb..18dfb4b 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.netty; +import java.util.concurrent.ThreadFactory; + import org.apache.camel.CamelContext; import org.apache.camel.Service; import org.jboss.netty.channel.Channel; @@ -31,15 +33,24 @@ import org.jboss.netty.channel.ChannelPipelineFactory; public interface NettyServerBootstrapFactory extends Service { /** - * Initializes this {@link NettyServerBootstrapFactory}. + * Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}. * - * @param camelContext Use <tt>null</tt> if this factory is to be shared among other Camel applications. + * @param camelContext the {@link CamelContext} for non-shared bootstrap factory * @param configuration the bootstrap configuration * @param pipelineFactory the pipeline factory */ void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory); /** + * Initializes this <b>shared</b> {@link NettyServerBootstrapFactory}. + * + * @param threadFactory the thread factory to use for shared bootstrap factory + * @param configuration the bootstrap configuration + * @param pipelineFactory the pipeline factory + */ + void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory); + + /** * When a new {@link Channel} is opened. */ void addChannel(Channel channel); http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java index 17b019b..23e90b0 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.apache.camel.CamelContext; import org.apache.camel.support.ServiceSupport; @@ -42,6 +43,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class); private final ChannelGroup allChannels; private CamelContext camelContext; + private ThreadFactory threadFactory; private NettyServerBootstrapConfiguration configuration; private ChannelPipelineFactory pipelineFactory; private ChannelFactory channelFactory; @@ -55,12 +57,17 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme } public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { - // notice CamelContext can be optional this.camelContext = camelContext; this.configuration = configuration; this.pipelineFactory = pipelineFactory; } + public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + this.threadFactory = threadFactory; + this.configuration = configuration; + this.pipelineFactory = pipelineFactory; + } + public void addChannel(Channel channel) { allChannels.add(channel); } @@ -79,6 +86,9 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme @Override protected void doStart() throws Exception { + if (camelContext == null && threadFactory == null) { + throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this); + } startServerBootstrap(); } @@ -91,20 +101,16 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme if (camelContext != null) { bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); + } else { + bossExecutor = Executors.newCachedThreadPool(threadFactory); + workerExecutor = Executors.newCachedThreadPool(threadFactory); + } - if (configuration.getWorkerCount() <= 0) { - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); - } else { - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, - configuration.getWorkerCount()); - } + if (configuration.getWorkerCount() <= 0) { + channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); } else { - if (configuration.getWorkerCount() <= 0) { - channelFactory = new NioServerSocketChannelFactory(); - } else { - channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool(), configuration.getWorkerCount()); - } + channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, + configuration.getWorkerCount()); } serverBootstrap = new ServerBootstrap(channelFactory); http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index b43fe97..53bc792 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.apache.camel.CamelContext; import org.apache.camel.support.ServiceSupport; @@ -43,6 +44,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); private final ChannelGroup allChannels; private CamelContext camelContext; + private ThreadFactory threadFactory; private NettyServerBootstrapConfiguration configuration; private ChannelPipelineFactory pipelineFactory; private DatagramChannelFactory datagramChannelFactory; @@ -55,12 +57,17 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme } public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { - // notice CamelContext can be optional this.camelContext = camelContext; this.configuration = configuration; this.pipelineFactory = pipelineFactory; } + public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + this.threadFactory = threadFactory; + this.configuration = configuration; + this.pipelineFactory = pipelineFactory; + } + public void addChannel(Channel channel) { allChannels.add(channel); } @@ -79,6 +86,9 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme @Override protected void doStart() throws Exception { + if (camelContext == null && threadFactory == null) { + throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this); + } startServerBootstrap(); } @@ -91,7 +101,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme if (camelContext != null) { workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); } else { - workerExecutor = Executors.newCachedThreadPool(); + workerExecutor = Executors.newCachedThreadPool(threadFactory); } if (configuration.getWorkerCount() <= 0) { http://git-wip-us.apache.org/repos/asf/camel/blob/a70a94fd/examples/camel-example-netty-http/README.txt ---------------------------------------------------------------------- diff --git a/examples/camel-example-netty-http/README.txt b/examples/camel-example-netty-http/README.txt index 616c26b..580e2b8 100644 --- a/examples/camel-example-netty-http/README.txt +++ b/examples/camel-example-netty-http/README.txt @@ -30,7 +30,7 @@ The port number can be changed by editing the following source file: In the Apache Karaf / ServiceMix shell type: - osgi:install -s mvn:mvn:org.apache.camel/camel-example-netty-http-shared/2.12.0 + osgi:install -s mvn:org.apache.camel/camel-example-netty-http-shared/2.12.0 Then you can install the Camel applications: