This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ed419c9  CAMEL-14247: camel-netty thread pool for consumer should not 
be fixed at 16 by default but take into account netty thread pool size and be 
bigger
ed419c9 is described below

commit ed419c96a0010bf1a1221c46c9a9bb50ebc589b4
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Dec 4 14:53:23 2019 +0100

    CAMEL-14247: camel-netty thread pool for consumer should not be fixed at 16 
by default but take into account netty thread pool size and be bigger
---
 .../src/main/docs/netty-http-component.adoc        |  4 +--
 .../camel-netty/src/main/docs/netty-component.adoc |  4 +--
 .../netty/DefaultServerInitializerFactory.java     |  2 --
 .../camel/component/netty/NettyComponent.java      | 39 +++++++++++-----------
 .../apache/camel/component/netty/NettyHelper.java  | 17 ++++++++++
 .../NettyHttpComponentConfiguration.java           | 12 +++++--
 .../springboot/NettyComponentConfiguration.java    | 12 +++++--
 7 files changed, 59 insertions(+), 31 deletions(-)

diff --git 
a/components/camel-netty-http/src/main/docs/netty-http-component.adoc 
b/components/camel-netty-http/src/main/docs/netty-http-component.adoc
index 79093e0..30a5ee5 100644
--- a/components/camel-netty-http/src/main/docs/netty-http-component.adoc
+++ b/components/camel-netty-http/src/main/docs/netty-http-component.adoc
@@ -104,8 +104,8 @@ The Netty HTTP component supports 11 options, which are 
listed below.
 | *headerFilterStrategy* (advanced) | To use a custom 
org.apache.camel.spi.HeaderFilterStrategy to filter headers. |  | 
HeaderFilterStrategy
 | *securityConfiguration* (security) | Refers to a 
org.apache.camel.component.netty.http.NettyHttpSecurityConfiguration for 
configuring secure web resources. |  | NettyHttpSecurityConfiguration
 | *useGlobalSslContext Parameters* (security) | Enable usage of global SSL 
context parameters. | false | boolean
-| *maximumPoolSize* (advanced) | The thread pool size for the 
EventExecutorGroup if its in use. The default value is 16. | 16 | int
-| *executorService* (advanced) | To use the given EventExecutorGroup. |  | 
EventExecutorGroup
+| *maximumPoolSize* (consumer) | Sets a maximum thread pool size for the netty 
consumer ordered thread pool. The default size is 2 x cpu core 1. Setting this 
value to eg 10 will then use 10 threads unless 2 x cpu core 1 is a higher 
value, which then will override and be used. For example if there are 8 cores, 
then the consumer thread pool will be 17. This thread pool is used to route 
messages received from Netty by Camel. We use a separate thread pool to ensure 
ordering of messages and a [...]
+| *executorService* (consumer) | To use the given EventExecutorGroup. |  | 
EventExecutorGroup
 | *sslContextParameters* (security) | To configure security using 
SSLContextParameters |  | SSLContextParameters
 | *basicPropertyBinding* (advanced) | Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities | false | boolean
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy 
(on the first message). By starting lazy you can use this to allow CamelContext 
and routes to startup in situations where a producer may otherwise fail during 
starting and cause the route to fail being started. By deferring this startup 
to be lazy then the startup failure can be handled during routing messages via 
Camel's routing error handlers. Beware that when the first message is processed 
then creating and [...]
diff --git a/components/camel-netty/src/main/docs/netty-component.adoc 
b/components/camel-netty/src/main/docs/netty-component.adoc
index cd23c1d..81ef113 100644
--- a/components/camel-netty/src/main/docs/netty-component.adoc
+++ b/components/camel-netty/src/main/docs/netty-component.adoc
@@ -61,9 +61,9 @@ The Netty component supports 8 options, which are listed 
below.
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *maximumPoolSize* (advanced) | The thread pool size for the 
EventExecutorGroup if its in use. The default value is 16. | 16 | int
+| *maximumPoolSize* (consumer) | Sets a maximum thread pool size for the netty 
consumer ordered thread pool. The default size is 2 x cpu core 1. Setting this 
value to eg 10 will then use 10 threads unless 2 x cpu core 1 is a higher 
value, which then will override and be used. For example if there are 8 cores, 
then the consumer thread pool will be 17. This thread pool is used to route 
messages received from Netty by Camel. We use a separate thread pool to ensure 
ordering of messages and a [...]
 | *configuration* (advanced) | To use the NettyConfiguration as configuration 
when creating endpoints. |  | NettyConfiguration
-| *executorService* (advanced) | To use the given EventExecutorGroup. |  | 
EventExecutorGroup
+| *executorService* (consumer) | To use the given EventExecutorGroup. |  | 
EventExecutorGroup
 | *useGlobalSslContext Parameters* (security) | Enable usage of global SSL 
context parameters. | false | boolean
 | *sslContextParameters* (security) | To configure security using 
SSLContextParameters |  | SSLContextParameters
 | *basicPropertyBinding* (advanced) | Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities | false | boolean
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
index 31c3787..5489c6f 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
@@ -103,14 +103,12 @@ public class DefaultServerInitializerFactory extends 
ServerInitializerFactory {
             // Just use EventExecutorGroup from the Netty Component
             EventExecutorGroup applicationExecutor = 
consumer.getEndpoint().getComponent().getExecutorService();
             addToPipeline("handler", channelPipeline, applicationExecutor, new 
ServerChannelHandler(consumer));
-
         } else {
             // still use the worker event loop group here
             addToPipeline("handler", channelPipeline, new 
ServerChannelHandler(consumer));
 
         }
         LOG.trace("Created ChannelPipeline: {}", channelPipeline);
-
     }
 
     private void addToPipeline(String name, ChannelPipeline pipeline, 
ChannelHandler handler) {
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
index a579f28..112e580 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
@@ -19,10 +19,10 @@ package org.apache.camel.component.netty;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ThreadFactory;
 
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.NettyRuntime;
 import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.internal.SystemPropertyUtil;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExtendedCamelContext;
@@ -33,16 +33,15 @@ import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.jsse.SSLContextParameters;
-import org.apache.camel.util.concurrent.CamelThreadFactory;
 
 @Component("netty")
 public class NettyComponent extends DefaultComponent implements 
SSLContextParametersAware {
 
     @Metadata(label = "advanced")
     private NettyConfiguration configuration;
-    @Metadata(label = "advanced", defaultValue = "16")
-    private int maximumPoolSize = 16;
-    @Metadata(label = "advanced")
+    @Metadata(label = "consumer,advanced")
+    private int maximumPoolSize;
+    @Metadata(label = "consumer,advanced")
     private volatile EventExecutorGroup executorService;
     @Metadata(label = "security", defaultValue = "false")
     private boolean useGlobalSslContextParameters;
@@ -62,9 +61,14 @@ public class NettyComponent extends DefaultComponent 
implements SSLContextParame
     }
 
     /**
-     * The thread pool size for the EventExecutorGroup if its in use.
-     * <p/>
-     * The default value is 16.
+     * Sets a maximum thread pool size for the netty consumer ordered thread 
pool.
+     * The default size is 2 x cpu core + 1. Setting this value to eg 10 will 
then use 10 threads
+     * unless 2 x cpu core + 1 is a higher value, which then will override and 
be used. For example
+     * if there are 8 cores, then the consumer thread pool will be 17.
+     *
+     * This thread pool is used to route messages received from Netty by Camel.
+     * We use a separate thread pool to ensure ordering of messages and also 
in case some messages
+     * will block, then nettys worker threads (event loop) wont be affected.
      */
     public void setMaximumPoolSize(int maximumPoolSize) {
         this.maximumPoolSize = maximumPoolSize;
@@ -164,21 +168,18 @@ public class NettyComponent extends DefaultComponent 
implements SSLContextParame
 
         //Only setup the executorService if it is needed
         if (configuration.isUsingExecutorService() && executorService == null) 
{
-            executorService = createExecutorService();
+            int netty = SystemPropertyUtil.getInt("io.netty.eventLoopThreads", 
NettyRuntime.availableProcessors() * 2);
+            // we want one more thread than netty uses for its event loop
+            // and if there is a custom size for maximum pool size then use 
it, unless netty event loops has more threads
+            // and therefore we use math.max to find the highest value
+            int threads = Math.max(maximumPoolSize, netty + 1);
+            executorService = 
NettyHelper.createExecutorGroup(getCamelContext(), 
"NettyConsumerExecutorGroup", threads);
+            log.info("Creating shared NettyConsumerExecutorGroup with {} 
threads", threads);
         }
 
         super.doStart();
     }
 
-    protected EventExecutorGroup createExecutorService() {
-        // Provide the executor service for the application
-        // and use a Camel thread factory so we have consistent thread namings
-        // we should use a shared thread pool as recommended by Netty
-        String pattern = 
getCamelContext().getExecutorServiceManager().getThreadNamePattern();
-        ThreadFactory factory = new CamelThreadFactory(pattern, 
"NettyEventExecutorGroup", true);
-        return new DefaultEventExecutorGroup(getMaximumPoolSize(), factory);
-    }
-
     @Override
     protected void doStop() throws Exception {
         //Only shutdown the executorService if it is created by netty component
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 9b6a39e..d240749 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -18,13 +18,18 @@ package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.concurrent.ThreadFactory;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.util.concurrent.CamelThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,4 +126,16 @@ public final class NettyHelper {
         }
     }
 
+    /**
+     * Creates a {@link DefaultEventExecutorGroup} with the given name and 
maximum thread pool size.
+     */
+    public static EventExecutorGroup createExecutorGroup(CamelContext 
camelContext, String name, int threads) {
+        // Provide the executor service for the application
+        // and use a Camel thread factory so we have consistent thread namings
+        // we should use a shared thread pool as recommended by Netty
+        String pattern = 
camelContext.getExecutorServiceManager().getThreadNamePattern();
+        ThreadFactory factory = new CamelThreadFactory(pattern, name, true);
+        return new DefaultEventExecutorGroup(threads, factory);
+    }
+
 }
diff --git 
a/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
index c5f9743..5de25cd 100644
--- 
a/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
@@ -65,10 +65,16 @@ public class NettyHttpComponentConfiguration
      */
     private Boolean useGlobalSslContextParameters = false;
     /**
-     * The thread pool size for the EventExecutorGroup if its in use. The
-     * default value is 16.
+     * Sets a maximum thread pool size for the netty consumer ordered thread
+     * pool. The default size is 2 x cpu core 1. Setting this value to eg 10
+     * will then use 10 threads unless 2 x cpu core 1 is a higher value, which
+     * then will override and be used. For example if there are 8 cores, then
+     * the consumer thread pool will be 17. This thread pool is used to route
+     * messages received from Netty by Camel. We use a separate thread pool to
+     * ensure ordering of messages and also in case some messages will block,
+     * then nettys worker threads (event loop) wont be affected.
      */
-    private Integer maximumPoolSize = 16;
+    private Integer maximumPoolSize;
     /**
      * To use the given EventExecutorGroup. The option is a
      * io.netty.util.concurrent.EventExecutorGroup type.
diff --git 
a/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
index 01fdf34..ff0b188 100644
--- 
a/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
@@ -52,10 +52,16 @@ public class NettyComponentConfiguration
      */
     private Boolean enabled;
     /**
-     * The thread pool size for the EventExecutorGroup if its in use. The
-     * default value is 16.
+     * Sets a maximum thread pool size for the netty consumer ordered thread
+     * pool. The default size is 2 x cpu core 1. Setting this value to eg 10
+     * will then use 10 threads unless 2 x cpu core 1 is a higher value, which
+     * then will override and be used. For example if there are 8 cores, then
+     * the consumer thread pool will be 17. This thread pool is used to route
+     * messages received from Netty by Camel. We use a separate thread pool to
+     * ensure ordering of messages and also in case some messages will block,
+     * then nettys worker threads (event loop) wont be affected.
      */
-    private Integer maximumPoolSize = 16;
+    private Integer maximumPoolSize;
     /**
      * To use the NettyConfiguration as configuration when creating endpoints.
      */

Reply via email to