This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch virtual-threads-basic-support in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3c70161eb902b1238c728601783a62b41f564bf5 Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Fri Dec 1 18:13:12 2023 +0100 Add basic support of virtual threads --- core/camel-support/pom.xml | 49 +++++ .../camel/support/DefaultThreadPoolFactory.java | 213 +++++++++++++++++++++ core/camel-util/pom.xml | 47 +++++ .../apache/camel/util/concurrent/ThreadType.java | 30 +++ .../camel/util/concurrent/ThreadTypeAware.java | 23 +++ .../camel/util/concurrent/CamelThreadFactory.java | 86 +++++++++ .../apache/camel/util/concurrent/ThreadType.java | 38 ++++ 7 files changed, 486 insertions(+) diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml index 32c6688c907..2ede683e498 100644 --- a/core/camel-support/pom.xml +++ b/core/camel-support/pom.xml @@ -85,4 +85,53 @@ </dependency> </dependencies> + <profiles> + <profile> + <id>java-21-sources</id> + <activation> + <jdk>[21,)</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin-version}</version> + <executions> + <execution> + <id>default-compile</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>compile-java-21</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <release>21</release> + <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots> + <multiReleaseOutput>true</multiReleaseOutput> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Multi-Release>true</Multi-Release> + </manifestEntries> + </archive> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java new file mode 100644 index 00000000000..323216f2f9d --- /dev/null +++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.StaticService; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor; +import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; +import org.apache.camel.util.concurrent.SizedScheduledExecutorService; +import org.apache.camel.util.concurrent.ThreadType; +import org.apache.camel.util.concurrent.ThreadTypeAware; + +/** + * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools. + */ +public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService, ThreadTypeAware { + + private CamelContext camelContext; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return ThreadPoolFactoryType.from(threadFactory).newCachedThreadPool(threadFactory); + } + + @Override + public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { + return ThreadPoolFactoryType.from(factory).newThreadPool(profile, factory); + } + + public ExecutorService newThreadPool( + int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) + throws IllegalArgumentException { + + return ThreadPoolFactoryType.from(threadFactory).newThreadPool( + corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut, + rejectedExecutionHandler, threadFactory); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return ThreadPoolFactoryType.from(threadFactory).newScheduledThreadPool(profile, threadFactory); + } + + private enum ThreadPoolFactoryType { + PLATFORM { + ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Executors.newCachedThreadPool(threadFactory); + } + + ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { + // allow core thread timeout is default true if not configured + boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true; + return newThreadPool(profile.getPoolSize(), + profile.getMaxPoolSize(), + profile.getKeepAliveTime(), + profile.getTimeUnit(), + profile.getMaxQueueSize(), + allow, + profile.getRejectedExecutionHandler(), + factory); + } + + ExecutorService newThreadPool( + int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) + throws IllegalArgumentException { + + // the core pool size must be 0 or higher + if (corePoolSize < 0) { + throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize); + } + + // validate max >= core + if (maxPoolSize < corePoolSize) { + throw new IllegalArgumentException( + "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); + } + + BlockingQueue<Runnable> workQueue; + if (corePoolSize == 0 && maxQueueSize <= 0) { + // use a synchronous queue for direct-handover (no tasks stored on the queue) + workQueue = new SynchronousQueue<>(); + // and force 1 as pool size to be able to create the thread pool by the JDK + corePoolSize = 1; + maxPoolSize = 1; + } else if (maxQueueSize <= 0) { + // use a synchronous queue for direct-handover (no tasks stored on the queue) + workQueue = new SynchronousQueue<>(); + } else { + // bounded task queue to store tasks on the queue + workQueue = new LinkedBlockingQueue<>(maxQueueSize); + } + + ThreadPoolExecutor answer + = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); + answer.setThreadFactory(threadFactory); + answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut); + if (rejectedExecutionHandler == null) { + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + } + answer.setRejectedExecutionHandler(rejectedExecutionHandler); + return answer; + } + + ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); + if (rejectedExecutionHandler == null) { + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + } + + ScheduledThreadPoolExecutor answer + = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); + answer.setRemoveOnCancelPolicy(true); + + // need to wrap the thread pool in a sized to guard against the problem that the + // JDK created thread pool has an unbounded queue (see class javadoc), which mean + // we could potentially keep adding tasks, and run out of memory. + if (profile.getMaxPoolSize() > 0) { + return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); + } else { + return answer; + } + } + }, + VIRTUAL { + @Override + ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Executors.newThreadPerTaskExecutor(threadFactory); + } + + @Override + ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { + return Executors.newThreadPerTaskExecutor(factory); + } + + @Override + ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, + int maxQueueSize, boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, + ThreadFactory threadFactory) throws IllegalArgumentException { + return Executors.newThreadPerTaskExecutor(threadFactory); + } + + @Override + ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return Executors.newScheduledThreadPool(0, threadFactory); + } + }; + + static ThreadPoolFactoryType from(ThreadFactory threadFactory) { + if (ThreadType.current() == ThreadType.PLATFORM) { + return ThreadPoolFactoryType.PLATFORM; + } + return threadFactory instanceof ThreadTypeAware ? + ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM; + } + + abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory); + + abstract ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory); + + abstract ExecutorService newThreadPool( + int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) + throws IllegalArgumentException; + + abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory); + } +} diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml index 2933fd9d9b8..c48ad466571 100644 --- a/core/camel-util/pom.xml +++ b/core/camel-util/pom.xml @@ -272,5 +272,52 @@ </plugins> </build> </profile> + <profile> + <id>java-21-sources</id> + <activation> + <jdk>[21,)</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin-version}</version> + <executions> + <execution> + <id>default-compile</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>compile-java-21</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <release>21</release> + <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots> + <multiReleaseOutput>true</multiReleaseOutput> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Multi-Release>true</Multi-Release> + </manifestEntries> + </archive> + </configuration> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java new file mode 100644 index 00000000000..b048a2640c9 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +/** + * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property + * {@code camel.threads.virtual.enabled} set to {@code true}. + */ +public enum ThreadType { + PLATFORM, + VIRTUAL; + + public static ThreadType current() { + return PLATFORM; + } +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java new file mode 100644 index 00000000000..0148e7cdbe8 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +/** + * The marker interface indicating whether a given class is thread type aware. + */ +public interface ThreadTypeAware { +} diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java new file mode 100644 index 00000000000..51156065cb1 --- /dev/null +++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.concurrent.ThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread factory which creates threads supporting a naming pattern. + * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to + * {@code true}. + */ +public final class CamelThreadFactory implements ThreadFactory, ThreadTypeAware { + private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class); + + private static final ThreadFactoryType TYPE = ThreadFactoryType.current(); + + private final String pattern; + private final String name; + private final boolean daemon; + + public CamelThreadFactory(String pattern, String name, boolean daemon) { + this.pattern = pattern; + this.name = name; + this.daemon = daemon; + } + + @Override + public Thread newThread(Runnable runnable) { + String threadName = ThreadHelper.resolveThreadName(pattern, name); + + Thread answer = TYPE.newThread(threadName, daemon, runnable); + + LOG.trace("Created thread[{}] -> {}", threadName, answer); + return answer; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "CamelThreadFactory[" + name + "]"; + } + + private enum ThreadFactoryType { + PLATFORM { + Thread.Builder newThreadBuilder(String threadName, boolean daemon) { + return Thread.ofPlatform().name(threadName).daemon(daemon); + } + }, + VIRTUAL { + Thread.Builder newThreadBuilder(String threadName, boolean daemon) { + return Thread.ofVirtual().name(threadName); + } + }; + + Thread newThread(String threadName, boolean daemon, Runnable runnable) { + return newThreadBuilder(threadName, daemon).unstarted(runnable); + } + + abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon); + + static ThreadFactoryType current() { + return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM; + } + } +} + diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java new file mode 100644 index 00000000000..0bc527ecf8e --- /dev/null +++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property + * {@code camel.threads.virtual.enabled} set to {@code true}. + * The default value is {@code false} which means that platform threads are used by default. + */ +public enum ThreadType { + PLATFORM, + VIRTUAL; + private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class); + private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM; + static { + LOG.info("The type of thread detected is {}", CURRENT); + } + public static ThreadType current() { + return CURRENT; + } +}