This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a537c  CAMEL-14882: camel-main - Allow to configure thread pool 
profiles
37a537c is described below

commit 37a537c27a8976034f3cfdbfec0fdff0216168f7
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Apr 15 12:58:53 2020 +0200

    CAMEL-14882: camel-main - Allow to configure thread pool profiles
---
 .../org/apache/camel/main/BaseMainSupport.java     | 101 +++++++++++++++++++++
 .../org/apache/camel/main/MainThreadPoolTest.java  |  91 +++++++++++++++++++
 2 files changed, 192 insertions(+)

diff --git 
a/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java 
b/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
index 4c2a5b1..0c6a8df 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -30,6 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
@@ -42,6 +44,7 @@ import org.apache.camel.PropertyBindingException;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
 import org.apache.camel.model.FaultToleranceConfigurationDefinition;
 import org.apache.camel.model.HystrixConfigurationDefinition;
 import org.apache.camel.model.Model;
@@ -50,10 +53,12 @@ import 
org.apache.camel.model.Resilience4jConfigurationDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.CamelBeanPostProcessor;
 import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.Language;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.PropertyConfigurer;
 import org.apache.camel.spi.RestConfiguration;
+import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.LifecycleStrategySupport;
 import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.service.BaseService;
@@ -64,6 +69,7 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.OrderedProperties;
 import org.apache.camel.util.PropertiesHelper;
 import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +87,8 @@ public abstract class BaseMainSupport extends BaseService {
 
     private static final String SENSITIVE_KEYS = 
"passphrase|password|secretkey|accesstoken|clientsecret|authorizationtoken|sasljaasconfig";
 
+    private static final String VALID_THREAD_POOL_KEYS = 
"id|poolsize|maxpoolsize|keepalivetime|timeunit|maxqueuesize|allowcorethreadtimeout|rejectedpolicy";
+
     protected final AtomicBoolean completed = new AtomicBoolean(false);
 
     protected volatile CamelContext camelContext;
@@ -721,6 +729,7 @@ public abstract class BaseMainSupport extends BaseService {
         Map<String, Object> resilience4jProperties = new LinkedHashMap<>();
         Map<String, Object> faultToleranceProperties = new LinkedHashMap<>();
         Map<String, Object> restProperties = new LinkedHashMap<>();
+        Map<String, Object> threadPoolProperties = new LinkedHashMap<>();
         Map<String, Object> beansProperties = new LinkedHashMap<>();
         for (String key : prop.stringPropertyNames()) {
             if (key.startsWith("camel.context.")) {
@@ -753,6 +762,12 @@ public abstract class BaseMainSupport extends BaseService {
                 String option = key.substring(11);
                 validateOptionAndValue(key, option, value);
                 restProperties.put(optionKey(option), value);
+            } else if (key.startsWith("camel.threadpool")) {
+                // grab the value
+                String value = prop.getProperty(key);
+                String option = key.substring(16);
+                validateOptionAndValue(key, option, value);
+                threadPoolProperties.put(optionKey(option), value);
             } else if (key.startsWith("camel.beans.")) {
                 // grab the value
                 String value = prop.getProperty(key);
@@ -817,6 +832,10 @@ public abstract class BaseMainSupport extends BaseService {
             setPropertiesOnTarget(camelContext, rest, restProperties, 
"camel.rest.",
                     mainConfigurationProperties.isAutoConfigurationFailFast(), 
true, autoConfiguredProperties);
         }
+        if (!threadPoolProperties.isEmpty()) {
+            LOG.debug("Auto-configuring Thread Pool from loaded properties: 
{}", threadPoolProperties.size());
+            setThreadPoolProfileProperties(camelContext, threadPoolProperties, 
mainConfigurationProperties.isAutoConfigurationFailFast(), 
autoConfiguredProperties);
+        }
 
         // log which options was not set
         if (!beansProperties.isEmpty()) {
@@ -856,6 +875,88 @@ public abstract class BaseMainSupport extends BaseService {
                 LOG.warn("Property not auto-configured: camel.rest.{}={} on 
bean: {}", k, v, rest);
             });
         }
+        if (!threadPoolProperties.isEmpty()) {
+            threadPoolProperties.forEach((k, v) -> {
+                LOG.warn("Property not auto-configured: camel.threadpool{}={} 
on bean: ThreadPoolProfileBuilder", k, v);
+            });
+        }
+    }
+
+    private void setThreadPoolProfileProperties(CamelContext camelContext, 
Map<String, Object> threadPoolProperties,
+                                                boolean failIfNotSet, 
Map<String, String> autoConfiguredProperties) {
+
+        Map<String, Map<String, String>> profiles = new LinkedHashMap<>();
+        // the id of the profile is in the key [xx]
+        threadPoolProperties.forEach((k, v) -> {
+            String id = StringHelper.between(k, "[", "].");
+            if (id == null) {
+                throw new IllegalArgumentException("Invalid syntax for key: 
camel.threadpool" + k + " should be: camel.threadpool[id]");
+            }
+            String key = StringHelper.after(k, "].");
+            String value = v.toString();
+            if (key == null) {
+                throw new PropertyBindingException("ThreadPoolProfileBuilder", 
k, value);
+            }
+            Map<String, String> map = profiles.computeIfAbsent(id, o -> new 
HashMap<>());
+            map.put(optionKey(key), value);
+
+            if (failIfNotSet && !VALID_THREAD_POOL_KEYS.contains(key)) {
+                throw new PropertyBindingException("ThreadPoolProfileBuilder", 
key, value);
+            }
+
+            autoConfiguredProperties.put(k, value);
+        });
+
+        // now build profiles from those options
+        for (String id : profiles.keySet()) {
+            Map<String, String> map = profiles.get(id);
+            // camel-main will lower-case keys
+            String overrideId = map.remove("id");
+            String poolSize = map.remove("poolsize");
+            String maxPoolSize = map.remove("maxpoolsize");
+            String keepAliveTime = map.remove("keepalivetime");
+            String timeUnit = map.remove("timeunit");
+            String maxQueueSize = map.remove("maxqueuesize");
+            String allowCoreThreadTimeOut = 
map.remove("allowcorethreadtimeout");
+            String rejectedPolicy = map.remove("rejectedpolicy");
+
+            if (overrideId != null) {
+                id = CamelContextHelper.parseText(camelContext, overrideId);
+            }
+            ThreadPoolProfileBuilder builder = new 
ThreadPoolProfileBuilder(id);
+            if ("default".equals(id)) {
+                builder.defaultProfile(true);
+            }
+            if (poolSize != null) {
+                builder.poolSize(CamelContextHelper.parseInteger(camelContext, 
poolSize));
+            }
+            if (maxPoolSize != null) {
+                
builder.maxPoolSize(CamelContextHelper.parseInteger(camelContext, maxPoolSize));
+            }
+            if (keepAliveTime != null && timeUnit != null) {
+                String text = CamelContextHelper.parseText(camelContext, 
timeUnit);
+                
builder.keepAliveTime(CamelContextHelper.parseLong(camelContext, 
keepAliveTime), camelContext.getTypeConverter().convertTo(TimeUnit.class, 
text));
+            }
+            if (keepAliveTime != null && timeUnit == null) {
+                
builder.keepAliveTime(CamelContextHelper.parseLong(camelContext, 
keepAliveTime));
+            }
+            if (maxQueueSize != null) {
+                
builder.maxQueueSize(CamelContextHelper.parseInteger(camelContext, 
maxQueueSize));
+            }
+            if (allowCoreThreadTimeOut != null) {
+                
builder.allowCoreThreadTimeOut(CamelContextHelper.parseBoolean(camelContext, 
allowCoreThreadTimeOut));
+            }
+            if (rejectedPolicy != null) {
+                String text = CamelContextHelper.parseText(camelContext, 
rejectedPolicy);
+                
builder.rejectedPolicy(camelContext.getTypeConverter().convertTo(ThreadPoolRejectedPolicy.class,
 text));
+            }
+            ExecutorServiceManager esm = 
camelContext.adapt(ExtendedCamelContext.class).getExecutorServiceManager();
+            if ("default".equals(id)) {
+                esm.setDefaultThreadPoolProfile(builder.build());
+            } else {
+                esm.registerThreadPoolProfile(builder.build());
+            }
+        }
     }
 
     private void bindBeansToRegistry(CamelContext camelContext, Map<String, 
Object> properties,
diff --git 
a/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java 
b/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java
new file mode 100644
index 0000000..949cd59
--- /dev/null
+++ 
b/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.main;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MainThreadPoolTest extends Assert {
+
+    @Test
+    public void testDefaultThreadPool() throws Exception {
+        Main main = new Main();
+        main.addRoutesBuilder(new MyRouteBuilder());
+        main.addProperty("camel.threadpool[default].pool-size", "5");
+        main.addProperty("camel.threadpool[default].max-pool-size", "10");
+        main.addProperty("camel.threadpool[default].max-queue-size", "20");
+        main.addProperty("camel.threadpool[default].rejectedPolicy", 
"DiscardOldest");
+        main.start();
+
+        CamelContext camelContext = main.getCamelContext();
+        assertNotNull(camelContext);
+
+        ThreadPoolProfile tp = 
camelContext.getExecutorServiceManager().getDefaultThreadPoolProfile();
+        assertEquals("default", tp.getId());
+        assertEquals(Boolean.TRUE, tp.isDefaultProfile());
+        assertEquals("5", tp.getPoolSize().toString());
+        assertEquals("10", tp.getMaxPoolSize().toString());
+        assertEquals("DiscardOldest", tp.getRejectedPolicy().toString());
+
+        main.stop();
+    }
+
+    @Test
+    public void testCustomThreadPool() throws Exception {
+        Main main = new Main();
+        main.addRoutesBuilder(new MyRouteBuilder());
+        main.addProperty("camel.threadpool[myPool].id", "myPool");
+        main.addProperty("camel.threadpool[myPool].pool-size", "1");
+        main.addProperty("camel.threadpool[myPool].max-pool-size", "2");
+        main.addProperty("camel.threadpool[myPool].rejectedPolicy", 
"DiscardOldest");
+        main.addProperty("camel.threadpool[myBigPool].id", "myBigPool");
+        main.addProperty("camel.threadpool[myBigPool].pool-size", "10");
+        main.addProperty("camel.threadpool[myBigPool].max-pool-size", "200");
+        main.addProperty("camel.threadpool[myBigPool].rejectedPolicy", 
"CallerRuns");
+        main.start();
+
+        CamelContext camelContext = main.getCamelContext();
+        assertNotNull(camelContext);
+
+        ThreadPoolProfile tp = 
camelContext.getExecutorServiceManager().getThreadPoolProfile("myPool");
+        assertEquals("myPool", tp.getId());
+        assertEquals(Boolean.FALSE, tp.isDefaultProfile());
+        assertEquals("1", tp.getPoolSize().toString());
+        assertEquals("2", tp.getMaxPoolSize().toString());
+        assertEquals("DiscardOldest", tp.getRejectedPolicy().toString());
+
+        tp = 
camelContext.getExecutorServiceManager().getThreadPoolProfile("myBigPool");
+        assertEquals("myBigPool", tp.getId());
+        assertEquals(Boolean.FALSE, tp.isDefaultProfile());
+        assertEquals("10", tp.getPoolSize().toString());
+        assertEquals("200", tp.getMaxPoolSize().toString());
+        assertEquals("CallerRuns", tp.getRejectedPolicy().toString());
+
+        main.stop();
+    }
+
+    public static class MyRouteBuilder extends RouteBuilder {
+        @Override
+        public void configure() throws Exception {
+            from("direct:start").to("seda:foo");
+        }
+    }
+
+}

Reply via email to