This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 37a537c CAMEL-14882: camel-main - Allow to configure thread pool
profiles
37a537c is described below
commit 37a537c27a8976034f3cfdbfec0fdff0216168f7
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Apr 15 12:58:53 2020 +0200
CAMEL-14882: camel-main - Allow to configure thread pool profiles
---
.../org/apache/camel/main/BaseMainSupport.java | 101 +++++++++++++++++++++
.../org/apache/camel/main/MainThreadPoolTest.java | 91 +++++++++++++++++++
2 files changed, 192 insertions(+)
diff --git
a/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
b/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
index 4c2a5b1..0c6a8df 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/BaseMainSupport.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -42,6 +44,7 @@ import org.apache.camel.PropertyBindingException;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
import org.apache.camel.model.FaultToleranceConfigurationDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.Model;
@@ -50,10 +53,12 @@ import
org.apache.camel.model.Resilience4jConfigurationDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.CamelBeanPostProcessor;
import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.PropertyConfigurer;
import org.apache.camel.spi.RestConfiguration;
+import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.support.service.BaseService;
@@ -64,6 +69,7 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.OrderedProperties;
import org.apache.camel.util.PropertiesHelper;
import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +87,8 @@ public abstract class BaseMainSupport extends BaseService {
private static final String SENSITIVE_KEYS =
"passphrase|password|secretkey|accesstoken|clientsecret|authorizationtoken|sasljaasconfig";
+ private static final String VALID_THREAD_POOL_KEYS =
"id|poolsize|maxpoolsize|keepalivetime|timeunit|maxqueuesize|allowcorethreadtimeout|rejectedpolicy";
+
protected final AtomicBoolean completed = new AtomicBoolean(false);
protected volatile CamelContext camelContext;
@@ -721,6 +729,7 @@ public abstract class BaseMainSupport extends BaseService {
Map<String, Object> resilience4jProperties = new LinkedHashMap<>();
Map<String, Object> faultToleranceProperties = new LinkedHashMap<>();
Map<String, Object> restProperties = new LinkedHashMap<>();
+ Map<String, Object> threadPoolProperties = new LinkedHashMap<>();
Map<String, Object> beansProperties = new LinkedHashMap<>();
for (String key : prop.stringPropertyNames()) {
if (key.startsWith("camel.context.")) {
@@ -753,6 +762,12 @@ public abstract class BaseMainSupport extends BaseService {
String option = key.substring(11);
validateOptionAndValue(key, option, value);
restProperties.put(optionKey(option), value);
+ } else if (key.startsWith("camel.threadpool")) {
+ // grab the value
+ String value = prop.getProperty(key);
+ String option = key.substring(16);
+ validateOptionAndValue(key, option, value);
+ threadPoolProperties.put(optionKey(option), value);
} else if (key.startsWith("camel.beans.")) {
// grab the value
String value = prop.getProperty(key);
@@ -817,6 +832,10 @@ public abstract class BaseMainSupport extends BaseService {
setPropertiesOnTarget(camelContext, rest, restProperties,
"camel.rest.",
mainConfigurationProperties.isAutoConfigurationFailFast(),
true, autoConfiguredProperties);
}
+ if (!threadPoolProperties.isEmpty()) {
+ LOG.debug("Auto-configuring Thread Pool from loaded properties:
{}", threadPoolProperties.size());
+ setThreadPoolProfileProperties(camelContext, threadPoolProperties,
mainConfigurationProperties.isAutoConfigurationFailFast(),
autoConfiguredProperties);
+ }
// log which options was not set
if (!beansProperties.isEmpty()) {
@@ -856,6 +875,88 @@ public abstract class BaseMainSupport extends BaseService {
LOG.warn("Property not auto-configured: camel.rest.{}={} on
bean: {}", k, v, rest);
});
}
+ if (!threadPoolProperties.isEmpty()) {
+ threadPoolProperties.forEach((k, v) -> {
+ LOG.warn("Property not auto-configured: camel.threadpool{}={}
on bean: ThreadPoolProfileBuilder", k, v);
+ });
+ }
+ }
+
+ private void setThreadPoolProfileProperties(CamelContext camelContext,
Map<String, Object> threadPoolProperties,
+ boolean failIfNotSet,
Map<String, String> autoConfiguredProperties) {
+
+ Map<String, Map<String, String>> profiles = new LinkedHashMap<>();
+ // the id of the profile is in the key [xx]
+ threadPoolProperties.forEach((k, v) -> {
+ String id = StringHelper.between(k, "[", "].");
+ if (id == null) {
+ throw new IllegalArgumentException("Invalid syntax for key:
camel.threadpool" + k + " should be: camel.threadpool[id]");
+ }
+ String key = StringHelper.after(k, "].");
+ String value = v.toString();
+ if (key == null) {
+ throw new PropertyBindingException("ThreadPoolProfileBuilder",
k, value);
+ }
+ Map<String, String> map = profiles.computeIfAbsent(id, o -> new
HashMap<>());
+ map.put(optionKey(key), value);
+
+ if (failIfNotSet && !VALID_THREAD_POOL_KEYS.contains(key)) {
+ throw new PropertyBindingException("ThreadPoolProfileBuilder",
key, value);
+ }
+
+ autoConfiguredProperties.put(k, value);
+ });
+
+ // now build profiles from those options
+ for (String id : profiles.keySet()) {
+ Map<String, String> map = profiles.get(id);
+ // camel-main will lower-case keys
+ String overrideId = map.remove("id");
+ String poolSize = map.remove("poolsize");
+ String maxPoolSize = map.remove("maxpoolsize");
+ String keepAliveTime = map.remove("keepalivetime");
+ String timeUnit = map.remove("timeunit");
+ String maxQueueSize = map.remove("maxqueuesize");
+ String allowCoreThreadTimeOut =
map.remove("allowcorethreadtimeout");
+ String rejectedPolicy = map.remove("rejectedpolicy");
+
+ if (overrideId != null) {
+ id = CamelContextHelper.parseText(camelContext, overrideId);
+ }
+ ThreadPoolProfileBuilder builder = new
ThreadPoolProfileBuilder(id);
+ if ("default".equals(id)) {
+ builder.defaultProfile(true);
+ }
+ if (poolSize != null) {
+ builder.poolSize(CamelContextHelper.parseInteger(camelContext,
poolSize));
+ }
+ if (maxPoolSize != null) {
+
builder.maxPoolSize(CamelContextHelper.parseInteger(camelContext, maxPoolSize));
+ }
+ if (keepAliveTime != null && timeUnit != null) {
+ String text = CamelContextHelper.parseText(camelContext,
timeUnit);
+
builder.keepAliveTime(CamelContextHelper.parseLong(camelContext,
keepAliveTime), camelContext.getTypeConverter().convertTo(TimeUnit.class,
text));
+ }
+ if (keepAliveTime != null && timeUnit == null) {
+
builder.keepAliveTime(CamelContextHelper.parseLong(camelContext,
keepAliveTime));
+ }
+ if (maxQueueSize != null) {
+
builder.maxQueueSize(CamelContextHelper.parseInteger(camelContext,
maxQueueSize));
+ }
+ if (allowCoreThreadTimeOut != null) {
+
builder.allowCoreThreadTimeOut(CamelContextHelper.parseBoolean(camelContext,
allowCoreThreadTimeOut));
+ }
+ if (rejectedPolicy != null) {
+ String text = CamelContextHelper.parseText(camelContext,
rejectedPolicy);
+
builder.rejectedPolicy(camelContext.getTypeConverter().convertTo(ThreadPoolRejectedPolicy.class,
text));
+ }
+ ExecutorServiceManager esm =
camelContext.adapt(ExtendedCamelContext.class).getExecutorServiceManager();
+ if ("default".equals(id)) {
+ esm.setDefaultThreadPoolProfile(builder.build());
+ } else {
+ esm.registerThreadPoolProfile(builder.build());
+ }
+ }
}
private void bindBeansToRegistry(CamelContext camelContext, Map<String,
Object> properties,
diff --git
a/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java
b/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java
new file mode 100644
index 0000000..949cd59
--- /dev/null
+++
b/core/camel-main/src/test/java/org/apache/camel/main/MainThreadPoolTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.main;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MainThreadPoolTest extends Assert {
+
+ @Test
+ public void testDefaultThreadPool() throws Exception {
+ Main main = new Main();
+ main.addRoutesBuilder(new MyRouteBuilder());
+ main.addProperty("camel.threadpool[default].pool-size", "5");
+ main.addProperty("camel.threadpool[default].max-pool-size", "10");
+ main.addProperty("camel.threadpool[default].max-queue-size", "20");
+ main.addProperty("camel.threadpool[default].rejectedPolicy",
"DiscardOldest");
+ main.start();
+
+ CamelContext camelContext = main.getCamelContext();
+ assertNotNull(camelContext);
+
+ ThreadPoolProfile tp =
camelContext.getExecutorServiceManager().getDefaultThreadPoolProfile();
+ assertEquals("default", tp.getId());
+ assertEquals(Boolean.TRUE, tp.isDefaultProfile());
+ assertEquals("5", tp.getPoolSize().toString());
+ assertEquals("10", tp.getMaxPoolSize().toString());
+ assertEquals("DiscardOldest", tp.getRejectedPolicy().toString());
+
+ main.stop();
+ }
+
+ @Test
+ public void testCustomThreadPool() throws Exception {
+ Main main = new Main();
+ main.addRoutesBuilder(new MyRouteBuilder());
+ main.addProperty("camel.threadpool[myPool].id", "myPool");
+ main.addProperty("camel.threadpool[myPool].pool-size", "1");
+ main.addProperty("camel.threadpool[myPool].max-pool-size", "2");
+ main.addProperty("camel.threadpool[myPool].rejectedPolicy",
"DiscardOldest");
+ main.addProperty("camel.threadpool[myBigPool].id", "myBigPool");
+ main.addProperty("camel.threadpool[myBigPool].pool-size", "10");
+ main.addProperty("camel.threadpool[myBigPool].max-pool-size", "200");
+ main.addProperty("camel.threadpool[myBigPool].rejectedPolicy",
"CallerRuns");
+ main.start();
+
+ CamelContext camelContext = main.getCamelContext();
+ assertNotNull(camelContext);
+
+ ThreadPoolProfile tp =
camelContext.getExecutorServiceManager().getThreadPoolProfile("myPool");
+ assertEquals("myPool", tp.getId());
+ assertEquals(Boolean.FALSE, tp.isDefaultProfile());
+ assertEquals("1", tp.getPoolSize().toString());
+ assertEquals("2", tp.getMaxPoolSize().toString());
+ assertEquals("DiscardOldest", tp.getRejectedPolicy().toString());
+
+ tp =
camelContext.getExecutorServiceManager().getThreadPoolProfile("myBigPool");
+ assertEquals("myBigPool", tp.getId());
+ assertEquals(Boolean.FALSE, tp.isDefaultProfile());
+ assertEquals("10", tp.getPoolSize().toString());
+ assertEquals("200", tp.getMaxPoolSize().toString());
+ assertEquals("CallerRuns", tp.getRejectedPolicy().toString());
+
+ main.stop();
+ }
+
+ public static class MyRouteBuilder extends RouteBuilder {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").to("seda:foo");
+ }
+ }
+
+}