This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch virtual-threads-basic-support in repository https://gitbox.apache.org/repos/asf/camel.git
commit 34f3abb8fbea9df27f69a9665c6bb6d41588626b Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Fri Dec 1 18:13:12 2023 +0100 CAMEL-20187: Add basic support of virtual threads --- Jenkinsfile.jdk21 | 3 +- .../impl/DefaultExecutorServiceManagerTest.java | 3 + .../camel/processor/WireTapAbortPolicyTest.java | 3 + .../AggregateTimeoutWithNoExecutorServiceTest.java | 3 + .../DualManagedThreadPoolProfileTest.java | 3 + .../DualManagedThreadPoolWithIdTest.java | 3 + .../management/ManagedRouteRemoveWireTapTest.java | 3 + .../management/ManagedThreadPoolProfileTest.java | 3 + .../camel/management/ManagedThreadPoolTest.java | 3 + .../management/ManagedThreadPoolWithIdTest.java | 3 + core/camel-support/pom.xml | 49 +++++ .../camel/support/DefaultThreadPoolFactory.java | 208 +++++++++++++++++++++ core/camel-util/pom.xml | 47 +++++ .../util/concurrent/ThreadFactoryTypeAware.java | 32 ++++ .../apache/camel/util/concurrent/ThreadType.java | 30 +++ .../camel/util/concurrent/CamelThreadFactory.java | 91 +++++++++ .../apache/camel/util/concurrent/ThreadType.java | 38 ++++ .../modules/ROOT/pages/threading-model.adoc | 12 ++ 18 files changed, 536 insertions(+), 1 deletion(-) diff --git a/Jenkinsfile.jdk21 b/Jenkinsfile.jdk21 index 5c25cfd3973..b4a902dcf0c 100644 --- a/Jenkinsfile.jdk21 +++ b/Jenkinsfile.jdk21 @@ -43,6 +43,7 @@ pipeline { parameters { booleanParam(name: 'CLEAN', defaultValue: true, description: 'Perform the build in clean workspace') + booleanParam(name: 'VIRTUAL_THREAD', defaultValue: false, description: 'Perform the build using virtual threads') } stages { @@ -73,7 +74,7 @@ pipeline { stage('Test') { steps { timeout(unit: 'HOURS', time: 7) { - sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify" + sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify -Dcamel.threads.virtual.enabled=${params.VIRTUAL_THREAD}" } } post { diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java index 3913414d050..8d3f98ef00b 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java @@ -27,6 +27,7 @@ import org.apache.camel.util.concurrent.SizedScheduledExecutorService; import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, ThreadPerTaskExecutor is created instead of ThreadPoolExecutor") public class DefaultExecutorServiceManagerTest extends ContextTestSupport { @Test diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java index 57260b4be7d..81b279f21dd 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java @@ -30,6 +30,7 @@ import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.parallel.Isolated; import static org.junit.jupiter.api.Assertions.fail; @@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.fail; /** * Wire tap unit test */ +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "Tasks are not rejected when using Virtual Threads") @Isolated public class WireTapAbortPolicyTest extends ContextTestSupport { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java index cc838dbeb30..624bf630d7f 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java @@ -21,9 +21,12 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import static org.junit.jupiter.api.Assertions.assertTrue; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the threads cannot be counted this way") public class AggregateTimeoutWithNoExecutorServiceTest extends ContextTestSupport { @Test diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java index a17468632cf..79cbfe181c6 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java @@ -23,12 +23,15 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL; import static org.junit.jupiter.api.Assertions.assertEquals; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes") @DisabledOnOs(OS.AIX) public class DualManagedThreadPoolProfileTest extends ManagementTestSupport { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java index 6e1364609d7..71748076d8c 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java @@ -21,12 +21,15 @@ import javax.management.ObjectName; import org.apache.camel.builder.RouteBuilder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL; import static org.junit.jupiter.api.Assertions.assertEquals; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes") @DisabledOnOs(OS.AIX) public class DualManagedThreadPoolWithIdTest extends ManagementTestSupport { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java index 4bf24a0fe36..499a63d68cf 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java @@ -24,6 +24,7 @@ import javax.management.ObjectName; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -32,6 +33,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the thread pools are not ThreadPoolExecutor") @DisabledOnOs(OS.AIX) public class ManagedRouteRemoveWireTapTest extends ManagementTestSupport { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java index e58414bf5bc..f3b9efee03d 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java @@ -23,6 +23,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -30,6 +31,8 @@ import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TY import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes") @DisabledOnOs(OS.AIX) public class ManagedThreadPoolProfileTest extends ManagementTestSupport { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java index 83cfc322b08..ce8be83dbb6 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java @@ -28,12 +28,15 @@ import javax.management.ReflectionException; import org.apache.camel.builder.RouteBuilder; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL; import static org.junit.jupiter.api.Assertions.assertEquals; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes") @DisabledOnOs(OS.AIX) public class ManagedThreadPoolTest extends ManagementTestSupport { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java index de542a796ca..9116e20f8ef 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java @@ -21,12 +21,15 @@ import javax.management.ObjectName; import org.apache.camel.builder.RouteBuilder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL; import static org.junit.jupiter.api.Assertions.assertEquals; +@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true", + disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes") @DisabledOnOs(OS.AIX) public class ManagedThreadPoolWithIdTest extends ManagementTestSupport { diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml index 32c6688c907..2ede683e498 100644 --- a/core/camel-support/pom.xml +++ b/core/camel-support/pom.xml @@ -85,4 +85,53 @@ </dependency> </dependencies> + <profiles> + <profile> + <id>java-21-sources</id> + <activation> + <jdk>[21,)</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin-version}</version> + <executions> + <execution> + <id>default-compile</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>compile-java-21</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <release>21</release> + <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots> + <multiReleaseOutput>true</multiReleaseOutput> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Multi-Release>true</Multi-Release> + </manifestEntries> + </archive> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java new file mode 100644 index 00000000000..8fee4c9efd8 --- /dev/null +++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.StaticService; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor; +import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; +import org.apache.camel.util.concurrent.SizedScheduledExecutorService; +import org.apache.camel.util.concurrent.ThreadType; +import org.apache.camel.util.concurrent.ThreadFactoryTypeAware; + +/** + * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools. + */ +public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService { + + private CamelContext camelContext; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return ThreadPoolFactoryType.from(threadFactory, Integer.MAX_VALUE).newCachedThreadPool(threadFactory); + } + + @Override + public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { + // allow core thread timeout is default true if not configured + boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true; + return newThreadPool(profile.getPoolSize(), + profile.getMaxPoolSize(), + profile.getKeepAliveTime(), + profile.getTimeUnit(), + profile.getMaxQueueSize(), + allow, + profile.getRejectedExecutionHandler(), + factory); + } + + public ExecutorService newThreadPool( + int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) + throws IllegalArgumentException { + // the core pool size must be 0 or higher + if (corePoolSize < 0) { + throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize); + } + + // validate max >= core + if (maxPoolSize < corePoolSize) { + throw new IllegalArgumentException( + "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); + } + return ThreadPoolFactoryType.from(threadFactory, corePoolSize, maxPoolSize, maxQueueSize).newThreadPool( + corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut, + rejectedExecutionHandler, threadFactory); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return ThreadPoolFactoryType.from(threadFactory, profile).newScheduledThreadPool(profile, threadFactory); + } + + private enum ThreadPoolFactoryType { + PLATFORM { + ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Executors.newCachedThreadPool(threadFactory); + } + + ExecutorService newThreadPool( + int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) + throws IllegalArgumentException { + + BlockingQueue<Runnable> workQueue; + if (corePoolSize == 0 && maxQueueSize <= 0) { + // use a synchronous queue for direct-handover (no tasks stored on the queue) + workQueue = new SynchronousQueue<>(); + // and force 1 as pool size to be able to create the thread pool by the JDK + corePoolSize = 1; + maxPoolSize = 1; + } else if (maxQueueSize <= 0) { + // use a synchronous queue for direct-handover (no tasks stored on the queue) + workQueue = new SynchronousQueue<>(); + } else { + // bounded task queue to store tasks on the queue + workQueue = new LinkedBlockingQueue<>(maxQueueSize); + } + + ThreadPoolExecutor answer + = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); + answer.setThreadFactory(threadFactory); + answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut); + if (rejectedExecutionHandler == null) { + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + } + answer.setRejectedExecutionHandler(rejectedExecutionHandler); + return answer; + } + + ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); + if (rejectedExecutionHandler == null) { + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + } + + ScheduledThreadPoolExecutor answer + = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); + answer.setRemoveOnCancelPolicy(true); + + // need to wrap the thread pool in a sized to guard against the problem that the + // JDK created thread pool has an unbounded queue (see class javadoc), which mean + // we could potentially keep adding tasks, and run out of memory. + if (profile.getMaxPoolSize() > 0) { + return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); + } else { + return answer; + } + } + }, + VIRTUAL { + @Override + ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Executors.newThreadPerTaskExecutor(threadFactory); + } + + @Override + ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, + int maxQueueSize, boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, + ThreadFactory threadFactory) throws IllegalArgumentException { + return Executors.newThreadPerTaskExecutor(threadFactory); + } + + @Override + ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return Executors.newScheduledThreadPool(0, threadFactory); + } + }; + + static ThreadPoolFactoryType from(ThreadFactory threadFactory, ThreadPoolProfile profile) { + return from(threadFactory, profile.getPoolSize(), profile.getMaxPoolSize(), profile.getMaxQueueSize()); + } + + static ThreadPoolFactoryType from(ThreadFactory threadFactory, int corePoolSize, int maxPoolSize, int maxQueueSize) { + return from(threadFactory, corePoolSize == 0 && maxQueueSize <= 0 ? 1 : maxPoolSize); + } + + static ThreadPoolFactoryType from(ThreadFactory threadFactory, int maxPoolSize) { + if (ThreadType.current() == ThreadType.PLATFORM) { + return ThreadPoolFactoryType.PLATFORM; + } + return maxPoolSize > 1 && threadFactory instanceof ThreadFactoryTypeAware factoryTypeAware && factoryTypeAware.isVirtual() ? + ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM; + } + + abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory); + + abstract ExecutorService newThreadPool( + int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) + throws IllegalArgumentException; + + abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory); + } +} diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml index 2933fd9d9b8..c48ad466571 100644 --- a/core/camel-util/pom.xml +++ b/core/camel-util/pom.xml @@ -272,5 +272,52 @@ </plugins> </build> </profile> + <profile> + <id>java-21-sources</id> + <activation> + <jdk>[21,)</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin-version}</version> + <executions> + <execution> + <id>default-compile</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>compile-java-21</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <release>21</release> + <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots> + <multiReleaseOutput>true</multiReleaseOutput> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Multi-Release>true</Multi-Release> + </manifestEntries> + </archive> + </configuration> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java new file mode 100644 index 00000000000..39a7a398768 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.concurrent.ThreadFactory; + +/** + * The interface indicating whether the created threads are virtual or not. + */ +public interface ThreadFactoryTypeAware extends ThreadFactory { + + /** + * Indicates whether the created threads are virtual. + * + * @return {@code true} if the created threads are virtual, {@code false} if they are platform threads. + */ + boolean isVirtual(); +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java new file mode 100644 index 00000000000..b048a2640c9 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +/** + * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property + * {@code camel.threads.virtual.enabled} set to {@code true}. + */ +public enum ThreadType { + PLATFORM, + VIRTUAL; + + public static ThreadType current() { + return PLATFORM; + } +} diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java new file mode 100644 index 00000000000..d6c54ddc692 --- /dev/null +++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread factory which creates threads supporting a naming pattern. + * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to + * {@code true}. + */ +public final class CamelThreadFactory implements ThreadFactoryTypeAware { + private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class); + + private static final ThreadFactoryType TYPE = ThreadFactoryType.current(); + + private final String pattern; + private final String name; + private final boolean daemon; + private final ThreadFactoryType threadType; + + public CamelThreadFactory(String pattern, String name, boolean daemon) { + this.pattern = pattern; + this.name = name; + this.daemon = daemon; + this.threadType = daemon ? TYPE : ThreadFactoryType.PLATFORM; + } + + @Override + public boolean isVirtual() { + return threadType == ThreadFactoryType.VIRTUAL; + } + + @Override + public Thread newThread(Runnable runnable) { + String threadName = ThreadHelper.resolveThreadName(pattern, name); + + Thread answer = threadType.newThread(threadName, daemon, runnable); + + LOG.trace("Created thread[{}] -> {}", threadName, answer); + return answer; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "CamelThreadFactory[" + name + "]"; + } + + private enum ThreadFactoryType { + PLATFORM { + Thread.Builder newThreadBuilder(String threadName, boolean daemon) { + return Thread.ofPlatform().name(threadName).daemon(daemon); + } + }, + VIRTUAL { + Thread.Builder newThreadBuilder(String threadName, boolean daemon) { + return Thread.ofVirtual().name(threadName); + } + }; + + Thread newThread(String threadName, boolean daemon, Runnable runnable) { + return newThreadBuilder(threadName, daemon).unstarted(runnable); + } + + abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon); + + static ThreadFactoryType current() { + return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM; + } + } +} + diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java new file mode 100644 index 00000000000..0bc527ecf8e --- /dev/null +++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property + * {@code camel.threads.virtual.enabled} set to {@code true}. + * The default value is {@code false} which means that platform threads are used by default. + */ +public enum ThreadType { + PLATFORM, + VIRTUAL; + private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class); + private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM; + static { + LOG.info("The type of thread detected is {}", CURRENT); + } + public static ThreadType current() { + return CURRENT; + } +} diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc b/docs/user-manual/modules/ROOT/pages/threading-model.adoc index b71194857be..a8782e808f3 100644 --- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc +++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc @@ -241,3 +241,15 @@ you should implement and hook into the WorkManager. To hook in custom thread pool providers (e.g. for J2EE servers) a `ThreadPoolFactory` interface can be implemented. The implementation can be set in the `ExecutorServiceManager`. + +== Virtual Threads + +Starting from Java 21, the default `ThreadPoolFactory` can build `ExecutorService` and `ScheduledExecutorService` that +use https://openjdk.org/jeps/425[virtual threads] instead of platform threads. +But as it is an experimental feature, it is not enabled by default, you need to set the System property `camel.threads.virtual.enabled` +to `true` and run Camel using Java 21 or above to enable it. + +Be aware that even if it is enabled, there are some use cases where platform threads are still used, for example, if the +thread factory is configured to create non-daemon threads since virtual threads can only be daemons, or when the +`ExecutorService` or `ScheduledExecutorService` to build cannot have more than one thread or finally when `corePoolSize` +is set to zero and `maxQueueSize` is set to a value less or equal to `0`.