This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new db42d9c CAMEL-15197: Thread pool can use vertx executor to reuse its threading model. (#3956) db42d9c is described below commit db42d9cce6e1d3189241dae4ec37c978e8d9fc3c Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jun 29 13:16:39 2020 +0200 CAMEL-15197: Thread pool can use vertx executor to reuse its threading model. (#3956) --- apache-camel/src/main/descriptors/common-bin.xml | 1 + bom/camel-bom/pom.xml | 5 + .../org/apache/camel/catalog/docs.properties | 1 + .../docs/reactive-threadpoolfactory-vertx.adoc | 64 ++++++++ .../org/apache/camel/catalog/others.properties | 1 + .../catalog/others/threadpoolfactory-vertx.json | 15 ++ .../src/main/docs/reactive-executor-vertx.adoc | 23 +-- .../reactive/vertx/VertXReactiveExecutor.java | 54 ++++--- ...ockTest.java => SimpleMockLookupVertxTest.java} | 12 +- .../org/apache/camel/reactive/SimpleMockTest.java | 13 +- components/camel-threadpoolfactory-vertx/pom.xml | 77 +++++++++ .../services/org/apache/camel/other.properties | 7 + .../services/org/apache/camel/reactive-executor | 2 + .../services/org/apache/camel/thread-pool-factory | 2 + .../resources/reactive-executor-vertx.json | 15 ++ .../resources/threadpoolfactory-vertx.json | 15 ++ .../docs/reactive-threadpoolfactory-vertx.adoc | 64 ++++++++ .../reactive/vertx/VertXThreadPoolFactory.java | 179 +++++++++++++++++++++ .../camel/reactive/SplitCustomThreadPoolTest.java | 79 +++++++++ .../reactive/SplitParallelLookupVertxTest.java} | 46 +++--- .../apache/camel/reactive/SplitParallelTest.java} | 50 +++--- .../src/test/resources/log4j2.properties | 31 ++++ components/pom.xml | 1 + .../apache/camel/spi/SimpleExecutorService.java | 11 ++ .../org/apache/camel/spi/ThreadPoolFactory.java | 5 + .../camel/impl/engine/AbstractCamelContext.java | 15 +- .../impl/engine/BaseExecutorServiceManager.java | 25 ++- .../DefaultAnnotationBasedProcessorFactory.java | 2 +- .../camel/impl/engine/DefaultProducerTemplate.java | 3 +- .../camel/impl/engine/DefaultShutdownStrategy.java | 9 +- .../processor/aggregate/AggregateProcessor.java | 2 +- .../camel/support/DefaultThreadPoolFactory.java | 18 ++- docs/components/modules/others/nav.adoc | 1 + .../pages/reactive-threadpoolfactory-vertx.adoc | 66 ++++++++ parent/pom.xml | 5 + 35 files changed, 813 insertions(+), 106 deletions(-) diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml index 923e39e..b54a518 100644 --- a/apache-camel/src/main/descriptors/common-bin.xml +++ b/apache-camel/src/main/descriptors/common-bin.xml @@ -364,6 +364,7 @@ <include>org.apache.camel:camel-testcontainers-junit5</include> <include>org.apache.camel:camel-testcontainers-spring</include> <include>org.apache.camel:camel-testcontainers-spring-junit5</include> + <include>org.apache.camel:camel-threadpoolfactory-vertx</include> <include>org.apache.camel:camel-thrift</include> <include>org.apache.camel:camel-tika</include> <include>org.apache.camel:camel-timer</include> diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml index 79bb915..b81eb55 100644 --- a/bom/camel-bom/pom.xml +++ b/bom/camel-bom/pom.xml @@ -1775,6 +1775,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-threadpoolfactory-vertx</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-thrift</artifactId> <version>${project.version}</version> </dependency> diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties index 1784dec..d876d4e 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties @@ -387,6 +387,7 @@ rabbitmq-component random-eip reactive-executor-vertx reactive-streams-component +reactive-threadpoolfactory-vertx reactor recipientList-eip ref-component diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/reactive-threadpoolfactory-vertx.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/reactive-threadpoolfactory-vertx.adoc new file mode 100644 index 0000000..f04786d --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/reactive-threadpoolfactory-vertx.adoc @@ -0,0 +1,64 @@ +[[threadpoolfactory-vertx-component]] += Thread Pool Factory Vert.x Component +:docTitle: ThreadPoolFactory Vert.x +:artifactId: camel-threadpoolfactory-vertx +:description: ThreadPoolFactory for camel-core using Vert.x +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The camel-threadpoolfactory-vertx is a VertX based implementation of the `ThreadPoolFactory` SPI. + +By default Camel will use its own thread pool for EIPs that can use parallel processing (such as splitter, aggregator). +You can plugin different engines via a SPI interface. This is a VertX based plugin that uses the VertX worker thread pool +(executeBlocking). + +== Restrictions + +This implementation has been designed to use VertX worker threads for EIPs where concurrency has been enabled (using default settings). +However this is limited to only apply when the EIP are not configured with a specific thread pool. For example the first example +below will use VertX worker threads, and the 2nd below will not: + +[source,java] +---- +from("direct:start") + .to("log:foo") + .split(body()).parallelProcessing() + .to("mock:split") + .end() + .to("mock:result"); +---- + +The following Split EIP will refer to a custom thread pool, and therefore VertX is not in use, and Camel will +use the custom thread pool: + +[source,java] +---- +// register a custom thread pool profile with id myLowPool +context.getExecutorServiceManager().registerThreadPoolProfile( + new ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build() +); + +from("direct:start") + .to("log:foo") + .split(body()).executorServiceRef("myLowPool") + .to("mock:split") + .end() + .to("mock:result"); +---- + +== VertX instance + +This implementation will by default create a default `io.vertx.core.Vertx` instance to be used. +However you can configure an existing instance using the getter/setter on the `VertXThreadPoolFactory` class. + +== Auto detection from classpath + +To use this implementation all you need to do is to add the `camel-threadpoolfactory-vertx` dependency to the classpath, +and Camel should auto-detect this on startup and log as follows: + +[source,text] +---- +Using ThreadPoolFactory: camel-threadpoolfactory-vertx +---- diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties index c708226..73a6af4 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties @@ -33,5 +33,6 @@ testcontainers testcontainers-junit5 testcontainers-spring testcontainers-spring-junit5 +threadpoolfactory-vertx undertow-spring-security zipkin diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/threadpoolfactory-vertx.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/threadpoolfactory-vertx.json new file mode 100644 index 0000000..037eafb --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/threadpoolfactory-vertx.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "threadpoolfactory-vertx", + "title": "ThreadPoolFactory Vert.x", + "description": "ThreadPoolFactory for camel-core using Vert.x", + "deprecated": false, + "firstVersion": "3.5.0", + "label": "reactive", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-threadpoolfactory-vertx", + "version": "3.5.0-SNAPSHOT" + } +} diff --git a/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc b/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc index d76c301..dbf107a 100644 --- a/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc +++ b/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc @@ -21,7 +21,7 @@ At this time this component is an experiment so use it with care. == VertX instance -This implementation will by default create a default `io.vertx.core.Vertx` instance to be used. +This implementation will first lookup in the registry for an existing `io.vertx.core.Vertx` to be used. However you can configure an existing instance using the getter/setter on the `VertXReactiveExecutor` class. == Auto detection from classpath @@ -31,25 +31,6 @@ and Camel should auto-detect this on startup and log as follows: [source,text] ---- -Using ReactiveExecutor: org.apache.camel.reactive.vertx.VertXReactiveExecutor@2a62b5bc +Using ReactiveExecutor: camel-reactive-executor-vertx ---- -== Manual enabling - -If you use OSGi or the implementation is not added to the classpath, you need to enable this explict such: - -[source,java] ----- -CamelContext camel = ... - -camel.setReactiveExecutor(new VertXReactiveExecutor()); ----- - -Or in XML DSL (spring or blueprint XML file) you can declare the factory as a `<bean>`: - -[source,xml] ----- -<bean id="vertxReactiveExecutor" class="org.apache.camel.reactive.vertx.VertXReactiveExecutor"/> ----- - -and then Camel should detect the bean and use the reactive executor. diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java index 9fe72ec..0ada813 100644 --- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java +++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java @@ -16,7 +16,11 @@ */ package org.apache.camel.reactive.vertx; +import java.util.Set; + import io.vertx.core.Vertx; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Experimental; import org.apache.camel.StaticService; import org.apache.camel.spi.ReactiveExecutor; @@ -32,12 +36,22 @@ import org.slf4j.LoggerFactory; */ @Experimental @JdkService(ReactiveExecutor.FACTORY) -public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExecutor, StaticService { +public class VertXReactiveExecutor extends ServiceSupport implements CamelContextAware, ReactiveExecutor, StaticService { private static final Logger LOG = LoggerFactory.getLogger(VertXReactiveExecutor.class); + private CamelContext camelContext; private Vertx vertx; - private boolean shouldClose; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } public Vertx getVertx() { return vertx; @@ -51,6 +65,25 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe } @Override + protected void doInit() throws Exception { + super.doInit(); + if (vertx == null) { + Set<Vertx> set = getCamelContext().getRegistry().findByType(Vertx.class); + if (set.size() == 1) { + vertx = set.iterator().next(); + } + } + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + if (vertx == null) { + throw new IllegalArgumentException("VertX instance must be configured."); + } + } + + @Override public void schedule(Runnable runnable) { LOG.trace("schedule: {}", runnable); vertx.nettyEventLoopGroup().execute(runnable); @@ -79,23 +112,6 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe } @Override - protected void doStart() throws Exception { - if (vertx == null) { - LOG.debug("Starting VertX"); - shouldClose = true; - vertx = Vertx.vertx(); - } - } - - @Override - protected void doStop() throws Exception { - if (vertx != null && shouldClose) { - LOG.debug("Stopping VertX"); - vertx.close(); - } - } - - @Override public String toString() { return "camel-reactive-executor-vertx"; } diff --git a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockLookupVertxTest.java similarity index 85% copy from components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java copy to components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockLookupVertxTest.java index 5e1c01b..79683fc 100644 --- a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java +++ b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockLookupVertxTest.java @@ -16,21 +16,21 @@ */ package org.apache.camel.reactive; +import io.vertx.core.Vertx; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.engine.AbstractCamelContext; -import org.apache.camel.reactive.vertx.VertXReactiveExecutor; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class SimpleMockTest extends CamelTestSupport { +public class SimpleMockLookupVertxTest extends CamelTestSupport { + + private final Vertx vertx = Vertx.vertx(); @Override protected CamelContext createCamelContext() throws Exception { - AbstractCamelContext context = new DefaultCamelContext(); - context.setReactiveExecutor(new VertXReactiveExecutor()); + CamelContext context = super.createCamelContext(); + context.getRegistry().bind("vertx", vertx); return context; } diff --git a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java index 5e1c01b..2a39514 100644 --- a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java +++ b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java @@ -16,21 +16,26 @@ */ package org.apache.camel.reactive; +import io.vertx.core.Vertx; import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.engine.AbstractCamelContext; import org.apache.camel.reactive.vertx.VertXReactiveExecutor; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; public class SimpleMockTest extends CamelTestSupport { + private final Vertx vertx = Vertx.vertx(); + @Override protected CamelContext createCamelContext() throws Exception { - AbstractCamelContext context = new DefaultCamelContext(); - context.setReactiveExecutor(new VertXReactiveExecutor()); + CamelContext context = super.createCamelContext(); + + VertXReactiveExecutor re = (VertXReactiveExecutor) context.adapt(ExtendedCamelContext.class).getReactiveExecutor(); + re.setVertx(vertx); + return context; } diff --git a/components/camel-threadpoolfactory-vertx/pom.xml b/components/camel-threadpoolfactory-vertx/pom.xml new file mode 100644 index 0000000..45fde68 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.5.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-threadpoolfactory-vertx</artifactId> + <packaging>jar</packaging> + <name>Camel :: Thread Pool Factory :: Vert.x</name> + <description>ThreadPoolFactory for camel-core using Vert.x</description> + + <properties> + <firstVersion>3.5.0</firstVersion> + <label>reactive</label> + <title>ThreadPoolFactory Vert.x</title> + <supportLevel>preview</supportLevel> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + + <dependency> + <groupId>io.vertx</groupId> + <artifactId>vertx-core</artifactId> + <version>${vertx-version}</version> + </dependency> + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties new file mode 100644 index 0000000..c9aede5 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +name=threadpoolfactory-vertx +groupId=org.apache.camel +artifactId=camel-threadpoolfactory-vertx +version=3.5.0-SNAPSHOT +projectName=Camel :: Thread Pool Factory :: Vert.x +projectDescription=ThreadPoolFactory for camel-core using Vert.x diff --git a/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/reactive-executor b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/reactive-executor new file mode 100644 index 0000000..1e1e324 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/reactive-executor @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.reactive.vertx.VertXReactiveExecutor diff --git a/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory new file mode 100644 index 0000000..b938ef6 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.reactive.vertx.VertXThreadPoolFactory diff --git a/components/camel-threadpoolfactory-vertx/src/generated/resources/reactive-executor-vertx.json b/components/camel-threadpoolfactory-vertx/src/generated/resources/reactive-executor-vertx.json new file mode 100644 index 0000000..217160b --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/generated/resources/reactive-executor-vertx.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "reactive-executor-vertx", + "title": "Reactive Executor Vert.x", + "description": "Reactive Executor for camel-core using Vert.x", + "deprecated": false, + "firstVersion": "3.0.0", + "label": "reactive", + "supportLevel": "Experimental", + "groupId": "org.apache.camel", + "artifactId": "camel-reactive-executor-vertx", + "version": "3.5.0-SNAPSHOT" + } +} diff --git a/components/camel-threadpoolfactory-vertx/src/generated/resources/threadpoolfactory-vertx.json b/components/camel-threadpoolfactory-vertx/src/generated/resources/threadpoolfactory-vertx.json new file mode 100644 index 0000000..037eafb --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/generated/resources/threadpoolfactory-vertx.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "threadpoolfactory-vertx", + "title": "ThreadPoolFactory Vert.x", + "description": "ThreadPoolFactory for camel-core using Vert.x", + "deprecated": false, + "firstVersion": "3.5.0", + "label": "reactive", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-threadpoolfactory-vertx", + "version": "3.5.0-SNAPSHOT" + } +} diff --git a/components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc b/components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc new file mode 100644 index 0000000..9a487b1 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc @@ -0,0 +1,64 @@ +[[threadpoolfactory-vertx-component]] += Thread Pool Factory Vert.x Component +:docTitle: ThreadPoolFactory Vert.x +:artifactId: camel-threadpoolfactory-vertx +:description: ThreadPoolFactory for camel-core using Vert.x +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The camel-threadpoolfactory-vertx is a VertX based implementation of the `ThreadPoolFactory` SPI. + +By default Camel will use its own thread pool for EIPs that can use parallel processing (such as splitter, aggregator). +You can plugin different engines via a SPI interface. This is a VertX based plugin that uses the VertX worker thread pool +(executeBlocking). + +== Restrictions + +This implementation has been designed to use VertX worker threads for EIPs where concurrency has been enabled (using default settings). +However this is limited to only apply when the EIP are not configured with a specific thread pool. For example the first example +below will use VertX worker threads, and the 2nd below will not: + +[source,java] +---- +from("direct:start") + .to("log:foo") + .split(body()).parallelProcessing() + .to("mock:split") + .end() + .to("mock:result"); +---- + +The following Split EIP will refer to a custom thread pool, and therefore VertX is not in use, and Camel will +use the custom thread pool: + +[source,java] +---- +// register a custom thread pool profile with id myLowPool +context.getExecutorServiceManager().registerThreadPoolProfile( + new ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build() +); + +from("direct:start") + .to("log:foo") + .split(body()).executorServiceRef("myLowPool") + .to("mock:split") + .end() + .to("mock:result"); +---- + +== VertX instance + +This implementation will first lookup in the registry for an existing `io.vertx.core.Vertx` to be used. +However you can configure an existing instance using the getter/setter on the `VertXThreadPoolFactory` class. + +== Auto detection from classpath + +To use this implementation all you need to do is to add the `camel-threadpoolfactory-vertx` dependency to the classpath, +and Camel should auto-detect this on startup and log as follows: + +[source,text] +---- +Using ThreadPoolFactory: camel-threadpoolfactory-vertx +---- diff --git a/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java new file mode 100644 index 0000000..5c72b86 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.reactive.vertx; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.vertx.core.Vertx; +import org.apache.camel.spi.SimpleExecutorService; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.annotations.JdkService; +import org.apache.camel.support.DefaultThreadPoolFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@JdkService(ThreadPoolFactory.FACTORY) +public class VertXThreadPoolFactory extends DefaultThreadPoolFactory implements ThreadPoolFactory { + + private static final Logger LOG = LoggerFactory.getLogger(VertXThreadPoolFactory.class); + + private final ExecutorService vertxExecutorService = new VertXExecutorService(); + private Vertx vertx; + + public Vertx getVertx() { + return vertx; + } + + /** + * To use an existing instance of {@link Vertx} instead of creating a default instance. + */ + public void setVertx(Vertx vertx) { + this.vertx = vertx; + } + + @Override + protected void doInit() throws Exception { + super.doInit(); + if (vertx == null) { + Set<Vertx> set = getCamelContext().getRegistry().findByType(Vertx.class); + if (set.size() == 1) { + vertx = set.iterator().next(); + } + } + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + if (vertx == null) { + throw new IllegalArgumentException("VertX instance must be configured."); + } + } + + @Override + public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + // only do this for default profile (which mean its not a custom pool) + if (profile.isDefaultProfile()) { + return vertxExecutorService; + } else { + return super.newThreadPool(profile, threadFactory); + } + } + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return vertxExecutorService; + } + + @Override + public String toString() { + return "camel-threadpoolfactory-vertx"; + } + + private class VertXExecutorService implements ExecutorService, SimpleExecutorService { + + @Override + public void shutdown() { + // noop + } + + @Override + public List<Runnable> shutdownNow() { + // noop + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return null; + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return null; + } + + @Override + public Future<?> submit(Runnable task) { + LOG.trace("submit: {}", task); + final CompletableFuture<?> answer = new CompletableFuture<>(); + // used by vertx + vertx.executeBlocking(future -> { + task.run(); + future.complete(); + }, res -> answer.complete(null)); + return answer; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + + @Override + public void execute(Runnable command) { + LOG.trace("execute: {}", command); + // used by vertx + vertx.executeBlocking(future -> { + command.run(); + future.complete(); + }, null); + } + } + +} diff --git a/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitCustomThreadPoolTest.java b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitCustomThreadPoolTest.java new file mode 100644 index 0000000..2888df0 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitCustomThreadPoolTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.reactive; + +import io.vertx.core.Vertx; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolProfileBuilder; +import org.apache.camel.reactive.vertx.VertXThreadPoolFactory; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Assert; +import org.junit.Test; + +public class SplitCustomThreadPoolTest extends CamelTestSupport { + + private final Vertx vertx = Vertx.vertx(); + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + VertXThreadPoolFactory tpf = (VertXThreadPoolFactory) context.getExecutorServiceManager().getThreadPoolFactory(); + tpf.setVertx(vertx); + + return context; + } + + @Test + public void testSplit() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J"); + getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A", "B", "C", "D", "E", "F", "G", "H", "I", "J"); + + template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J"); + + assertMockEndpointsSatisfied(); + + vertx.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // register a custom thread pool profile with id myLowPool + context.getExecutorServiceManager().registerThreadPoolProfile( + new ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build() + ); + + from("direct:start") + .to("log:foo") + .split(body()).executorServiceRef("myLowPool") + .to("log:bar") + .process(e -> { + String name = Thread.currentThread().getName(); + Assert.assertTrue("Should use Camel thread", name.startsWith("Camel")); + }) + .to("mock:split") + .end() + .to("log:result") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelLookupVertxTest.java similarity index 54% copy from components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java copy to components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelLookupVertxTest.java index 5e1c01b..39ec41f 100644 --- a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java +++ b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelLookupVertxTest.java @@ -16,43 +16,34 @@ */ package org.apache.camel.reactive; +import io.vertx.core.Vertx; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.engine.AbstractCamelContext; -import org.apache.camel.reactive.vertx.VertXReactiveExecutor; import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Assert; import org.junit.Test; -public class SimpleMockTest extends CamelTestSupport { +public class SplitParallelLookupVertxTest extends CamelTestSupport { + + private final Vertx vertx = Vertx.vertx(); @Override protected CamelContext createCamelContext() throws Exception { - AbstractCamelContext context = new DefaultCamelContext(); - context.setReactiveExecutor(new VertXReactiveExecutor()); + CamelContext context = super.createCamelContext(); + context.getRegistry().bind("vertx", vertx); return context; } @Test - public void testSimple() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World"); + public void testSplit() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J"); + getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A", "B", "C", "D", "E", "F", "G", "H", "I", "J"); - template.sendBody("direct:start", "Hello World"); + template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J"); assertMockEndpointsSatisfied(); - } - - @Test - public void testSimpleTwoMessages() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World", "Bye World"); - - template.sendBody("direct:start", "Hello World"); - template.sendBody("direct:start", "Bye World"); - assertMockEndpointsSatisfied(); + vertx.close(); } @Override @@ -60,7 +51,18 @@ public class SimpleMockTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("log:foo").to("log:bar").to("mock:result"); + from("direct:start") + .to("log:foo") + .split(body()).parallelProcessing() + .to("log:bar") + .process(e -> { + String name = Thread.currentThread().getName(); + Assert.assertTrue("Should use vertx thread", name.startsWith("vert.x-worker-thread")); + }) + .to("mock:split") + .end() + .to("log:result") + .to("mock:result"); } }; } diff --git a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java similarity index 51% copy from components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java copy to components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java index 5e1c01b..30dacdf 100644 --- a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java +++ b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java @@ -16,43 +16,38 @@ */ package org.apache.camel.reactive; +import io.vertx.core.Vertx; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.engine.AbstractCamelContext; -import org.apache.camel.reactive.vertx.VertXReactiveExecutor; +import org.apache.camel.reactive.vertx.VertXThreadPoolFactory; import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Assert; import org.junit.Test; -public class SimpleMockTest extends CamelTestSupport { +public class SplitParallelTest extends CamelTestSupport { + + private final Vertx vertx = Vertx.vertx(); @Override protected CamelContext createCamelContext() throws Exception { - AbstractCamelContext context = new DefaultCamelContext(); - context.setReactiveExecutor(new VertXReactiveExecutor()); - return context; - } - - @Test - public void testSimple() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World"); + CamelContext context = super.createCamelContext(); - template.sendBody("direct:start", "Hello World"); + VertXThreadPoolFactory tpf = (VertXThreadPoolFactory) context.getExecutorServiceManager().getThreadPoolFactory(); + tpf.setVertx(vertx); - assertMockEndpointsSatisfied(); + return context; } @Test - public void testSimpleTwoMessages() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World", "Bye World"); + public void testSplit() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J"); + getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A", "B", "C", "D", "E", "F", "G", "H", "I", "J"); - template.sendBody("direct:start", "Hello World"); - template.sendBody("direct:start", "Bye World"); + template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J"); assertMockEndpointsSatisfied(); + + vertx.close(); } @Override @@ -60,7 +55,18 @@ public class SimpleMockTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("log:foo").to("log:bar").to("mock:result"); + from("direct:start") + .to("log:foo") + .split(body()).parallelProcessing() + .to("log:bar") + .process(e -> { + String name = Thread.currentThread().getName(); + Assert.assertTrue("Should use vertx thread", name.startsWith("vert.x-worker-thread")); + }) + .to("mock:split") + .end() + .to("log:result") + .to("mock:result"); } }; } diff --git a/components/camel-threadpoolfactory-vertx/src/test/resources/log4j2.properties b/components/camel-threadpoolfactory-vertx/src/test/resources/log4j2.properties new file mode 100644 index 0000000..b06ea95 --- /dev/null +++ b/components/camel-threadpoolfactory-vertx/src/test/resources/log4j2.properties @@ -0,0 +1,31 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.out.type = File +appender.out.name = out +appender.out.fileName = target/camel-threadpoolfactory-vertx.log +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.stdout.type = Console +appender.stdout.name = stdout +appender.stdout.layout.type = PatternLayout +appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n + +rootLogger.level = INFO + +rootLogger.appenderRef.out.ref = out +#rootLogger.appenderRef.out.ref = stdout diff --git a/components/pom.xml b/components/pom.xml index bb3aeaf..26fd88b 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -354,6 +354,7 @@ <module>camel-tagsoup</module> <module>camel-tarfile</module> <module>camel-telegram</module> + <module>camel-threadpoolfactory-vertx</module> <module>camel-thrift</module> <module>camel-tika</module> <module>camel-twilio</module> diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SimpleExecutorService.java b/core/camel-api/src/main/java/org/apache/camel/spi/SimpleExecutorService.java new file mode 100644 index 0000000..e3ff3eb --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/SimpleExecutorService.java @@ -0,0 +1,11 @@ +package org.apache.camel.spi; + +import java.util.concurrent.ExecutorService; + +/** + * Marker interface to signal that a {@link ExecutorService} is simple and tasks are either + * only submitted via {@link ExecutorService#submit(Runnable)} or executed + * via {@link ExecutorService#execute(Runnable)} methods. + */ +public interface SimpleExecutorService { +} diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java index 45492bc..a25a539 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java @@ -29,6 +29,11 @@ import java.util.concurrent.ThreadFactory; public interface ThreadPoolFactory { /** + * Service factory key. + */ + String FACTORY = "thread-pool-factory"; + + /** * Creates a new cached thread pool * <p/> * The cached thread pool is a term from the JDK from the method {@link java.util.concurrent.Executors#newCachedThreadPool()}. diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index d1a0012..9a650cf 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -2794,6 +2794,13 @@ public abstract class AbstractCamelContext extends BaseService LOG.debug("Using ReactiveExecutor: {}", getReactiveExecutor()); } + // lets log at INFO level if we are not using the default thread pool factory + if (!getExecutorServiceManager().getThreadPoolFactory().getClass().getSimpleName().equals("DefaultThreadPoolFactory")) { + LOG.info("Using ThreadPoolFactory: {}", getExecutorServiceManager().getThreadPoolFactory()); + } else { + LOG.debug("Using ThreadPoolFactory: {}", getExecutorServiceManager().getThreadPoolFactory()); + } + HealthCheckRegistry hcr = getExtension(HealthCheckRegistry.class); if (hcr != null && hcr.isEnabled()) { LOG.info("Using HealthCheck: {}", hcr.getId()); @@ -2926,15 +2933,17 @@ public abstract class AbstractCamelContext extends BaseService } } - // shutdown executor service, reactive executor and management as the last one - shutdownServices(executorServiceManager); - shutdownServices(reactiveExecutor); + // shutdown management and lifecycle after all other services shutdownServices(managementStrategy); shutdownServices(managementMBeanAssembler); shutdownServices(lifecycleStrategies); // do not clear lifecycleStrategies as we can start Camel again and get // the route back as before + // shutdown executor service, reactive executor last + shutdownServices(executorServiceManager); + shutdownServices(reactiveExecutor); + // shutdown type converter as late as possible ServiceHelper.stopService(typeConverter); ServiceHelper.stopService(typeConverterRegistry); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java index 371c40b..e8d6404 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java @@ -30,14 +30,17 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.NamedNode; import org.apache.camel.StaticService; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.LifecycleStrategy; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.DefaultThreadPoolFactory; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; @@ -58,7 +61,7 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut private static final Logger LOG = LoggerFactory.getLogger(BaseExecutorServiceManager.class); private final CamelContext camelContext; - private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory(); + private ThreadPoolFactory threadPoolFactory; private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<>(); private String threadNamePattern; private long shutdownAwaitTermination = 10000; @@ -427,10 +430,28 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut @Override protected void doInit() throws Exception { super.doInit(); + if (threadNamePattern == null) { // set default name pattern which includes the camel context name threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#"; } + + // discover thread pool factory + if (threadPoolFactory == null) { + threadPoolFactory = new BaseServiceResolver<>(ThreadPoolFactory.FACTORY, ThreadPoolFactory.class) + .resolve(camelContext) + .orElseGet(DefaultThreadPoolFactory::new); + } + if (threadPoolFactory instanceof CamelContextAware) { + ((CamelContextAware) threadPoolFactory).setCamelContext(camelContext); + } + ServiceHelper.initService(threadPoolFactory); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + ServiceHelper.startService(threadPoolFactory); } @Override @@ -478,6 +499,8 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut it.remove(); } } + + ServiceHelper.stopAndShutdownServices(threadPoolFactory); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java index 6aeb5e5..468b9cf 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java @@ -54,7 +54,7 @@ public final class DefaultAnnotationBasedProcessorFactory implements AnnotationB recipientList.setShareUnitOfWork(annotation.shareUnitOfWork()); if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) { - ExecutorService executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, annotation.executorServiceRef()); + ExecutorService executor = camelContext.getExecutorServiceManager().newThreadPool(this, "@RecipientList", annotation.executorServiceRef()); recipientList.setExecutorService(executor); } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java index ec30247..8b9f6c1 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java @@ -782,8 +782,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT synchronized (lock) { if (executor == null) { if (threadedAsyncMode) { - executor = ObjectHelper.notNull(camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"), - "ExecutorService"); + executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"); } else { executor = new SynchronousExecutorService(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java index 4210f28..25597e8 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -199,10 +200,12 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS // use another thread to perform the shutdowns so we can support timeout timeoutOccurred.set(false); - currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, - abortAfterTimeout, timeoutOccurred, isLogInflightExchangesOnTimeout())); try { + currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, + abortAfterTimeout, timeoutOccurred, isLogInflightExchangesOnTimeout())); currentShutdownTaskFuture.get(timeout, timeUnit); + } catch (RejectedExecutionException e) { + // the task was rejected } catch (ExecutionException e) { // unwrap execution exception throw RuntimeCamelException.wrapRuntimeCamelException(e.getCause()); @@ -430,7 +433,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS private ExecutorService getExecutorService() { if (executor == null) { // use a thread pool that allow to terminate idle threads so they do not hang around forever - executor = camelContext.getExecutorServiceManager().newThreadPool(this, "ShutdownTask", 0, 1); + executor = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "ShutdownTask"); } return executor; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index aada58c..e14b243 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -1541,7 +1541,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat if (optimisticLocking) { lock = NoLock.INSTANCE; if (getOptimisticLockingExecutorService() == null) { - setOptimisticLockingExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_OPTIMISTIC_LOCKING_EXECUTOR, 1)); + setOptimisticLockingExecutorService(camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, AGGREGATE_OPTIMISTIC_LOCKING_EXECUTOR)); shutdownOptimisticLockingExecutorService = true; } } else { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java index 4bb7f23..5e1246b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java @@ -28,8 +28,12 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.StaticService; import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor; import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; import org.apache.camel.util.concurrent.SizedScheduledExecutorService; @@ -37,7 +41,19 @@ import org.apache.camel.util.concurrent.SizedScheduledExecutorService; /** * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools. */ -public class DefaultThreadPoolFactory implements ThreadPoolFactory { +public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService { + + private CamelContext camelContext; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } @Override public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { diff --git a/docs/components/modules/others/nav.adoc b/docs/components/modules/others/nav.adoc index 7410a5f..f0914d2 100644 --- a/docs/components/modules/others/nav.adoc +++ b/docs/components/modules/others/nav.adoc @@ -38,5 +38,6 @@ ** xref:testcontainers-junit5.adoc[Testcontainers JUnit5] ** xref:testcontainers-spring.adoc[Testcontainers Spring] ** xref:testcontainers-spring-junit5.adoc[Testcontainers Spring Junit5] +** xref:reactive-threadpoolfactory-vertx.adoc[ThreadPoolFactory Vert.x] ** xref:undertow-spring-security.adoc[Undertow Spring Security] ** xref:zipkin.adoc[Zipkin] diff --git a/docs/components/modules/others/pages/reactive-threadpoolfactory-vertx.adoc b/docs/components/modules/others/pages/reactive-threadpoolfactory-vertx.adoc new file mode 100644 index 0000000..cf33fda --- /dev/null +++ b/docs/components/modules/others/pages/reactive-threadpoolfactory-vertx.adoc @@ -0,0 +1,66 @@ +[[threadpoolfactory-vertx-component]] += Thread Pool Factory Vert.x Component +//THIS FILE IS COPIED: EDIT THE SOURCE FILE: +:page-source: components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc +:docTitle: ThreadPoolFactory Vert.x +:artifactId: camel-threadpoolfactory-vertx +:description: ThreadPoolFactory for camel-core using Vert.x +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The camel-threadpoolfactory-vertx is a VertX based implementation of the `ThreadPoolFactory` SPI. + +By default Camel will use its own thread pool for EIPs that can use parallel processing (such as splitter, aggregator). +You can plugin different engines via a SPI interface. This is a VertX based plugin that uses the VertX worker thread pool +(executeBlocking). + +== Restrictions + +This implementation has been designed to use VertX worker threads for EIPs where concurrency has been enabled (using default settings). +However this is limited to only apply when the EIP are not configured with a specific thread pool. For example the first example +below will use VertX worker threads, and the 2nd below will not: + +[source,java] +---- +from("direct:start") + .to("log:foo") + .split(body()).parallelProcessing() + .to("mock:split") + .end() + .to("mock:result"); +---- + +The following Split EIP will refer to a custom thread pool, and therefore VertX is not in use, and Camel will +use the custom thread pool: + +[source,java] +---- +// register a custom thread pool profile with id myLowPool +context.getExecutorServiceManager().registerThreadPoolProfile( + new ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build() +); + +from("direct:start") + .to("log:foo") + .split(body()).executorServiceRef("myLowPool") + .to("mock:split") + .end() + .to("mock:result"); +---- + +== VertX instance + +This implementation will by default create a default `io.vertx.core.Vertx` instance to be used. +However you can configure an existing instance using the getter/setter on the `VertXThreadPoolFactory` class. + +== Auto detection from classpath + +To use this implementation all you need to do is to add the `camel-threadpoolfactory-vertx` dependency to the classpath, +and Camel should auto-detect this on startup and log as follows: + +[source,text] +---- +Using ThreadPoolFactory: camel-threadpoolfactory-vertx +---- diff --git a/parent/pom.xml b/parent/pom.xml index c2f50c3..c62842f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -2364,6 +2364,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-threadpoolfactory-vertx</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-thrift</artifactId> <version>${project.version}</version> </dependency>