This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 37a537c CAMEL-14882: camel-main - Allow to configure thread pool profiles 37a537c is described below commit 37a537c27a8976034f3cfdbfec0fdff0216168f7 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Apr 15 12:58:53 2020 +0200 CAMEL-14882: camel-main - Allow to configure thread pool profiles --- .../org/apache/camel/main/BaseMainSupport.java | 101 +++++++++++++++++++++ .../org/apache/camel/main/MainThreadPoolTest.java | 91 +++++++++++++++++++ 2 files changed, 192 insertions(+) diff --git a/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java b/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java index 4c2a5b1..0c6a8df 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -30,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -42,6 +44,7 @@ import org.apache.camel.PropertyBindingException; import org.apache.camel.RoutesBuilder; import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolProfileBuilder; import org.apache.camel.model.FaultToleranceConfigurationDefinition; import org.apache.camel.model.HystrixConfigurationDefinition; import org.apache.camel.model.Model; @@ -50,10 +53,12 @@ import org.apache.camel.model.Resilience4jConfigurationDefinition; import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.CamelBeanPostProcessor; import org.apache.camel.spi.DataFormat; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.Language; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.PropertyConfigurer; import org.apache.camel.spi.RestConfiguration; +import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.LifecycleStrategySupport; import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.service.BaseService; @@ -64,6 +69,7 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.OrderedProperties; import org.apache.camel.util.PropertiesHelper; import org.apache.camel.util.StringHelper; +import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +87,8 @@ public abstract class BaseMainSupport extends BaseService { private static final String SENSITIVE_KEYS = "passphrase|password|secretkey|accesstoken|clientsecret|authorizationtoken|sasljaasconfig"; + private static final String VALID_THREAD_POOL_KEYS = "id|poolsize|maxpoolsize|keepalivetime|timeunit|maxqueuesize|allowcorethreadtimeout|rejectedpolicy"; + protected final AtomicBoolean completed = new AtomicBoolean(false); protected volatile CamelContext camelContext; @@ -721,6 +729,7 @@ public abstract class BaseMainSupport extends BaseService { Map<String, Object> resilience4jProperties = new LinkedHashMap<>(); Map<String, Object> faultToleranceProperties = new LinkedHashMap<>(); Map<String, Object> restProperties = new LinkedHashMap<>(); + Map<String, Object> threadPoolProperties = new LinkedHashMap<>(); Map<String, Object> beansProperties = new LinkedHashMap<>(); for (String key : prop.stringPropertyNames()) { if (key.startsWith("camel.context.")) { @@ -753,6 +762,12 @@ public abstract class BaseMainSupport extends BaseService { String option = key.substring(11); validateOptionAndValue(key, option, value); restProperties.put(optionKey(option), value); + } else if (key.startsWith("camel.threadpool")) { + // grab the value + String value = prop.getProperty(key); + String option = key.substring(16); + validateOptionAndValue(key, option, value); + threadPoolProperties.put(optionKey(option), value); } else if (key.startsWith("camel.beans.")) { // grab the value String value = prop.getProperty(key); @@ -817,6 +832,10 @@ public abstract class BaseMainSupport extends BaseService { setPropertiesOnTarget(camelContext, rest, restProperties, "camel.rest.", mainConfigurationProperties.isAutoConfigurationFailFast(), true, autoConfiguredProperties); } + if (!threadPoolProperties.isEmpty()) { + LOG.debug("Auto-configuring Thread Pool from loaded properties: {}", threadPoolProperties.size()); + setThreadPoolProfileProperties(camelContext, threadPoolProperties, mainConfigurationProperties.isAutoConfigurationFailFast(), autoConfiguredProperties); + } // log which options was not set if (!beansProperties.isEmpty()) { @@ -856,6 +875,88 @@ public abstract class BaseMainSupport extends BaseService { LOG.warn("Property not auto-configured: camel.rest.{}={} on bean: {}", k, v, rest); }); } + if (!threadPoolProperties.isEmpty()) { + threadPoolProperties.forEach((k, v) -> { + LOG.warn("Property not auto-configured: camel.threadpool{}={} on bean: ThreadPoolProfileBuilder", k, v); + }); + } + } + + private void setThreadPoolProfileProperties(CamelContext camelContext, Map<String, Object> threadPoolProperties, + boolean failIfNotSet, Map<String, String> autoConfiguredProperties) { + + Map<String, Map<String, String>> profiles = new LinkedHashMap<>(); + // the id of the profile is in the key [xx] + threadPoolProperties.forEach((k, v) -> { + String id = StringHelper.between(k, "[", "]."); + if (id == null) { + throw new IllegalArgumentException("Invalid syntax for key: camel.threadpool" + k + " should be: camel.threadpool[id]"); + } + String key = StringHelper.after(k, "]."); + String value = v.toString(); + if (key == null) { + throw new PropertyBindingException("ThreadPoolProfileBuilder", k, value); + } + Map<String, String> map = profiles.computeIfAbsent(id, o -> new HashMap<>()); + map.put(optionKey(key), value); + + if (failIfNotSet && !VALID_THREAD_POOL_KEYS.contains(key)) { + throw new PropertyBindingException("ThreadPoolProfileBuilder", key, value); + } + + autoConfiguredProperties.put(k, value); + }); + + // now build profiles from those options + for (String id : profiles.keySet()) { + Map<String, String> map = profiles.get(id); + // camel-main will lower-case keys + String overrideId = map.remove("id"); + String poolSize = map.remove("poolsize"); + String maxPoolSize = map.remove("maxpoolsize"); + String keepAliveTime = map.remove("keepalivetime"); + String timeUnit = map.remove("timeunit"); + String maxQueueSize = map.remove("maxqueuesize"); + String allowCoreThreadTimeOut = map.remove("allowcorethreadtimeout"); + String rejectedPolicy = map.remove("rejectedpolicy"); + + if (overrideId != null) { + id = CamelContextHelper.parseText(camelContext, overrideId); + } + ThreadPoolProfileBuilder builder = new ThreadPoolProfileBuilder(id); + if ("default".equals(id)) { + builder.defaultProfile(true); + } + if (poolSize != null) { + builder.poolSize(CamelContextHelper.parseInteger(camelContext, poolSize)); + } + if (maxPoolSize != null) { + builder.maxPoolSize(CamelContextHelper.parseInteger(camelContext, maxPoolSize)); + } + if (keepAliveTime != null && timeUnit != null) { + String text = CamelContextHelper.parseText(camelContext, timeUnit); + builder.keepAliveTime(CamelContextHelper.parseLong(camelContext, keepAliveTime), camelContext.getTypeConverter().convertTo(TimeUnit.class, text)); + } + if (keepAliveTime != null && timeUnit == null) { + builder.keepAliveTime(CamelContextHelper.parseLong(camelContext, keepAliveTime)); + } + if (maxQueueSize != null) { + builder.maxQueueSize(CamelContextHelper.parseInteger(camelContext, maxQueueSize)); + } + if (allowCoreThreadTimeOut != null) { + builder.allowCoreThreadTimeOut(CamelContextHelper.parseBoolean(camelContext, allowCoreThreadTimeOut)); + } + if (rejectedPolicy != null) { + String text = CamelContextHelper.parseText(camelContext, rejectedPolicy); + builder.rejectedPolicy(camelContext.getTypeConverter().convertTo(ThreadPoolRejectedPolicy.class, text)); + } + ExecutorServiceManager esm = camelContext.adapt(ExtendedCamelContext.class).getExecutorServiceManager(); + if ("default".equals(id)) { + esm.setDefaultThreadPoolProfile(builder.build()); + } else { + esm.registerThreadPoolProfile(builder.build()); + } + } } private void bindBeansToRegistry(CamelContext camelContext, Map<String, Object> properties, diff --git a/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java b/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java new file mode 100644 index 0000000..949cd59 --- /dev/null +++ b/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.main; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.ThreadPoolProfile; +import org.junit.Assert; +import org.junit.Test; + +public class MainThreadPoolTest extends Assert { + + @Test + public void testDefaultThreadPool() throws Exception { + Main main = new Main(); + main.addRoutesBuilder(new MyRouteBuilder()); + main.addProperty("camel.threadpool[default].pool-size", "5"); + main.addProperty("camel.threadpool[default].max-pool-size", "10"); + main.addProperty("camel.threadpool[default].max-queue-size", "20"); + main.addProperty("camel.threadpool[default].rejectedPolicy", "DiscardOldest"); + main.start(); + + CamelContext camelContext = main.getCamelContext(); + assertNotNull(camelContext); + + ThreadPoolProfile tp = camelContext.getExecutorServiceManager().getDefaultThreadPoolProfile(); + assertEquals("default", tp.getId()); + assertEquals(Boolean.TRUE, tp.isDefaultProfile()); + assertEquals("5", tp.getPoolSize().toString()); + assertEquals("10", tp.getMaxPoolSize().toString()); + assertEquals("DiscardOldest", tp.getRejectedPolicy().toString()); + + main.stop(); + } + + @Test + public void testCustomThreadPool() throws Exception { + Main main = new Main(); + main.addRoutesBuilder(new MyRouteBuilder()); + main.addProperty("camel.threadpool[myPool].id", "myPool"); + main.addProperty("camel.threadpool[myPool].pool-size", "1"); + main.addProperty("camel.threadpool[myPool].max-pool-size", "2"); + main.addProperty("camel.threadpool[myPool].rejectedPolicy", "DiscardOldest"); + main.addProperty("camel.threadpool[myBigPool].id", "myBigPool"); + main.addProperty("camel.threadpool[myBigPool].pool-size", "10"); + main.addProperty("camel.threadpool[myBigPool].max-pool-size", "200"); + main.addProperty("camel.threadpool[myBigPool].rejectedPolicy", "CallerRuns"); + main.start(); + + CamelContext camelContext = main.getCamelContext(); + assertNotNull(camelContext); + + ThreadPoolProfile tp = camelContext.getExecutorServiceManager().getThreadPoolProfile("myPool"); + assertEquals("myPool", tp.getId()); + assertEquals(Boolean.FALSE, tp.isDefaultProfile()); + assertEquals("1", tp.getPoolSize().toString()); + assertEquals("2", tp.getMaxPoolSize().toString()); + assertEquals("DiscardOldest", tp.getRejectedPolicy().toString()); + + tp = camelContext.getExecutorServiceManager().getThreadPoolProfile("myBigPool"); + assertEquals("myBigPool", tp.getId()); + assertEquals(Boolean.FALSE, tp.isDefaultProfile()); + assertEquals("10", tp.getPoolSize().toString()); + assertEquals("200", tp.getMaxPoolSize().toString()); + assertEquals("CallerRuns", tp.getRejectedPolicy().toString()); + + main.stop(); + } + + public static class MyRouteBuilder extends RouteBuilder { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo"); + } + } + +}