This is an automated email from the ASF dual-hosted git repository.
Croway pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f416cfe4101f CAMEL-23602: Honor maxQueueSize in threads EIP with
virtual threads
f416cfe4101f is described below
commit f416cfe4101f372f62ce6070e6a508691d740956
Author: Federico Mariani <[email protected]>
AuthorDate: Mon May 25 09:44:14 2026 +0200
CAMEL-23602: Honor maxQueueSize in threads EIP with virtual threads
---
.../camel/catalog/models/threadPoolProfile.json | 2 +-
.../org/apache/camel/catalog/models/threads.json | 2 +-
.../org/apache/camel/spring/xml/threadPool.json | 2 +-
.../org/apache/camel/model/threadPoolProfile.json | 2 +-
.../META-INF/org/apache/camel/model/threads.json | 2 +-
.../camel/model/ThreadPoolProfileDefinition.java | 2 +-
.../org/apache/camel/model/ThreadsDefinition.java | 2 +-
.../xml/AbstractCamelThreadPoolFactoryBean.java | 2 +-
.../camel/support/DefaultThreadPoolFactory.java | 17 +-
core/camel-util/pom.xml | 6 +
.../util/concurrent/BoundedExecutorService.java | 232 ++++++++++++++++++++
.../util/concurrent/ThreadPoolRejectedPolicy.java | 22 +-
.../concurrent/BoundedExecutorServiceTest.java | 241 +++++++++++++++++++++
.../ROOT/pages/camel-4x-upgrade-guide-4_21.adoc | 35 +++
.../modules/ROOT/pages/threading-model.adoc | 58 ++++-
.../modules/ROOT/pages/virtual-threads.adoc | 15 +-
.../dsl/yaml/deserializers/ModelDeserializers.java | 4 +-
.../resources/schema/camelYamlDsl-canonical.json | 4 +-
.../generated/resources/schema/camelYamlDsl.json | 4 +-
19 files changed, 630 insertions(+), 24 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
index 152d7a9935c1..fa8a0a18b9b4 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
@@ -22,6 +22,6 @@
"timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit",
"group": "advanced", "label": "advanced", "required": false, "type": "enum",
"javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS",
"MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the time unit to use for keep alive time By default SECONDS is used." },
"maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max
Queue Size", "group": "common", "required": false, "type": "integer",
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the maximum number of tasks in the work
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 9, "kind": "attribute",
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether idle core threads is allowed to
timeout and therefore can shrink the pool size below the core pool size Is by
default true" },
- "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the handler for tasks which cannot be executed by the
thread pool." }
+ "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret":
false, "description": "Sets the handler for tasks which cannot be executed by
the thread pool." }
}
}
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
index aae4d04db0d8..cb1c9325c8ee 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
@@ -24,7 +24,7 @@
"maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max
Queue Size", "group": "common", "required": false, "type": "integer",
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the maximum number of tasks in the work
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 10, "kind": "attribute",
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether idle core threads are allowed to
timeout and therefore can shrink the pool size below the core pool size Is by
default false" },
"threadName": { "index": 11, "kind": "attribute", "displayName": "Thread
Name", "group": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "Threads", "description": "Sets the thread name to use." },
- "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the handler for tasks which cannot be executed by the
thread pool." },
+ "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret":
false, "description": "Sets the handler for tasks which cannot be executed by
the thread pool." },
"callerRunsWhenRejected": { "index": 13, "kind": "attribute",
"displayName": "Caller Runs When Rejected", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": true, "description": "Whether or not to use as caller runs as
fallback when a task is rejected being added to the thread pool (when its
full). This is only used as fallback if no rejectedPolicy h [...]
}
}
diff --git
a/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
b/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
index ac99539187c9..46284132427c 100644
---
a/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
+++
b/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
@@ -19,7 +19,7 @@
"timeUnit": { "index": 4, "kind": "attribute", "displayName": "Time Unit",
"group": "common", "required": false, "type": "enum", "javaType":
"java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS",
"MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "SECONDS", "description":
"Sets the time unit used for keep alive time" },
"maxQueueSize": { "index": 5, "kind": "attribute", "displayName": "Max
Queue Size", "group": "common", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the maximum number of tasks in the work
queue. Use -1 for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 6, "kind": "attribute",
"displayName": "Allow Core Thread Time Out", "group": "common", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Sets whether to allow core threads to timeout" },
- "rejectedPolicy": { "index": 7, "kind": "attribute", "displayName":
"Rejected Policy", "group": "common", "required": false, "type": "enum",
"javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
"enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "CallerRuns", "description": "Sets the handler
for tasks which cannot be executed by the thread pool." },
+ "rejectedPolicy": { "index": 7, "kind": "attribute", "displayName":
"Rejected Policy", "group": "common", "required": false, "type": "enum",
"javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
"enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "CallerRuns", "description": "Sets the
handler for tasks which cannot be executed by the thread pool." },
"threadName": { "index": 8, "kind": "attribute", "displayName": "Thread
Name", "group": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "To use a custom thread name \/ pattern" },
"scheduled": { "index": 9, "kind": "attribute", "displayName":
"Scheduled", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether to use a
scheduled thread pool" },
"camelContextId": { "index": 10, "kind": "attribute", "displayName":
"Camel Context Id", "group": "common", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Id of CamelContext to use if there are
multiple CamelContexts in the same JVM" }
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
index 152d7a9935c1..fa8a0a18b9b4 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
+++
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
@@ -22,6 +22,6 @@
"timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit",
"group": "advanced", "label": "advanced", "required": false, "type": "enum",
"javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS",
"MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the time unit to use for keep alive time By default SECONDS is used." },
"maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max
Queue Size", "group": "common", "required": false, "type": "integer",
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the maximum number of tasks in the work
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 9, "kind": "attribute",
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether idle core threads is allowed to
timeout and therefore can shrink the pool size below the core pool size Is by
default true" },
- "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the handler for tasks which cannot be executed by the
thread pool." }
+ "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret":
false, "description": "Sets the handler for tasks which cannot be executed by
the thread pool." }
}
}
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
index aae4d04db0d8..cb1c9325c8ee 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
+++
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
@@ -24,7 +24,7 @@
"maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max
Queue Size", "group": "common", "required": false, "type": "integer",
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the maximum number of tasks in the work
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 10, "kind": "attribute",
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether idle core threads are allowed to
timeout and therefore can shrink the pool size below the core pool size Is by
default false" },
"threadName": { "index": 11, "kind": "attribute", "displayName": "Thread
Name", "group": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "Threads", "description": "Sets the thread name to use." },
- "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the handler for tasks which cannot be executed by the
thread pool." },
+ "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName":
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false,
"type": "enum", "javaType":
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort",
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret":
false, "description": "Sets the handler for tasks which cannot be executed by
the thread pool." },
"callerRunsWhenRejected": { "index": 13, "kind": "attribute",
"displayName": "Caller Runs When Rejected", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": true, "description": "Whether or not to use as caller runs as
fallback when a task is rejected being added to the thread pool (when its
full). This is only used as fallback if no rejectedPolicy h [...]
}
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
index 50533e929eef..1c26ee020cdf 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
@@ -58,7 +58,7 @@ public class ThreadPoolProfileDefinition extends
OptionalIdentifiedDefinition<Th
private String allowCoreThreadTimeOut;
@XmlAttribute
@Metadata(label = "advanced", javaType =
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
- enums = "Abort,CallerRuns")
+ enums = "Abort,CallerRuns,Block")
private String rejectedPolicy;
public ThreadPoolProfileDefinition() {
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
index 92df53d49d0e..c8827b4e9bf4 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
@@ -67,7 +67,7 @@ public class ThreadsDefinition extends
NoOutputDefinition<ThreadsDefinition>
private String threadName;
@XmlAttribute
@Metadata(label = "advanced", javaType =
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
- enums = "Abort,CallerRuns")
+ enums = "Abort,CallerRuns,Block")
private String rejectedPolicy;
@XmlAttribute
@Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue
= "true")
diff --git
a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
index 4b73caaa1abf..a33721b1b4d6 100644
---
a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
+++
b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
@@ -58,7 +58,7 @@ public abstract class AbstractCamelThreadPoolFactoryBean
extends AbstractCamelFa
@XmlAttribute
@Metadata(description = "Sets the handler for tasks which cannot be
executed by the thread pool.",
defaultValue = "CallerRuns", javaType =
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
- enums = "Abort,CallerRuns")
+ enums = "Abort,CallerRuns,Block")
private String rejectedPolicy = ThreadPoolRejectedPolicy.CallerRuns.name();
@XmlAttribute(required = true)
@Metadata(description = "To use a custom thread name / pattern")
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
index 70e6f5fb5c50..33110ef99690 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -34,10 +34,12 @@ import org.apache.camel.StaticService;
import org.apache.camel.spi.ThreadPoolFactory;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.BoundedExecutorService;
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
+import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.apache.camel.util.concurrent.ThreadType;
/**
@@ -64,6 +66,19 @@ public class DefaultThreadPoolFactory extends ServiceSupport
implements CamelCon
@Override
public ExecutorService newThreadPool(ThreadPoolProfile profile,
ThreadFactory factory) {
+ // Virtual threads: use the policy enum directly from the profile to
avoid reverse-mapping
+ if (profile.getMaxQueueSize() > 0
+ && ThreadPoolFactoryType.from(factory, profile) ==
ThreadPoolFactoryType.VIRTUAL) {
+ ThreadPoolRejectedPolicy policy = profile.getRejectedPolicy();
+ if (policy == null) {
+ policy = ThreadPoolRejectedPolicy.CallerRuns;
+ }
+ return new BoundedExecutorService(
+ ThreadPoolFactoryType.newThreadPerTaskExecutor(factory),
+ profile.getMaxQueueSize(),
+ profile.getKeepAliveTime(), profile.getTimeUnit(),
+ false, policy);
+ }
// allow core thread timeout is default true if not configured
boolean allow = profile.getAllowCoreThreadTimeOut() != null ?
profile.getAllowCoreThreadTimeOut() : true;
return newThreadPool(profile.getPoolSize(),
@@ -199,7 +214,7 @@ public class DefaultThreadPoolFactory extends
ServiceSupport implements CamelCon
}
@SuppressWarnings("unchecked")
- private static ExecutorService newThreadPerTaskExecutor(ThreadFactory
threadFactory) {
+ static ExecutorService newThreadPerTaskExecutor(ThreadFactory
threadFactory) {
try {
return (ExecutorService) Executors.class
.getMethod("newThreadPerTaskExecutor",
ThreadFactory.class)
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 41a1bed2bcc9..7da60eb85dfb 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -51,6 +51,12 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility-version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java
new file mode 100644
index 000000000000..6111377dcce5
--- /dev/null
+++
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * An {@link ExecutorService} wrapper that enforces bounded concurrency via a
{@link Semaphore}.
+ * <p>
+ * When virtual threads are enabled, Camel replaces the traditional {@link
java.util.concurrent.ThreadPoolExecutor} with
+ * {@code Executors.newThreadPerTaskExecutor()}, which accepts every task
immediately (unbounded). This wrapper limits
+ * the maximum number of tasks delegated to the underlying executor. Unlike
{@code ThreadPoolExecutor} there is no
+ * distinction between pool threads and queued tasks — the semaphore enforces
a flat concurrency cap on delegated tasks.
+ * <p>
+ * When the semaphore has no available permits, behavior depends on the
configured {@link ThreadPoolRejectedPolicy}:
+ * <ul>
+ * <li><b>CallerRuns</b> (default): blocks until a permit is available or the
timeout expires; on timeout, runs the task
+ * on the caller's thread. Tasks are never lost. Note that caller-run tasks
execute outside semaphore accounting, so
+ * total system concurrency may temporarily exceed {@code maxConcurrent}.</li>
+ * <li><b>Abort</b>: blocks until a permit is available or the timeout
expires; on timeout, throws
+ * {@link RejectedExecutionException}.</li>
+ * <li><b>Block</b>: blocks indefinitely until a permit becomes available. No
timeout, no rejection.</li>
+ * </ul>
+ * <p>
+ * <b>Caller thread blocking:</b> while waiting for a permit, the calling
thread is blocked. When callers are virtual
+ * threads this is inexpensive (the carrier thread is released). When callers
are platform threads (e.g., HTTP server
+ * threads) the blocked thread is unavailable for other work — this is
standard backpressure behavior but worth noting
+ * for capacity planning.
+ *
+ */
+public class BoundedExecutorService extends AbstractExecutorService {
+
+ private final ExecutorService delegate;
+ private final Semaphore semaphore;
+ private final int maxConcurrent;
+ private final long timeoutNanos;
+ private final ThreadPoolRejectedPolicy rejectedPolicy;
+ private final LongAdder callerRunsCount = new LongAdder();
+ private final LongAdder rejectedCount = new LongAdder();
+ private final LongAdder delegatedTaskCount = new LongAdder();
+
+ /**
+ * @param delegate the underlying executor (typically {@code
newThreadPerTaskExecutor})
+ * @param maxConcurrent the maximum number of tasks delegated to the
underlying executor concurrently
+ * @param acquireTimeout the maximum time to wait for a permit (ignored
when policy is {@code Block})
+ * @param timeUnit the time unit for {@code acquireTimeout}
+ * @param fair {@code true} for FIFO permit ordering
(predictable latency), {@code false} for barging
+ * (higher throughput)
+ * @param rejectedPolicy the policy to apply when no permit is available
+ */
+ public BoundedExecutorService(ExecutorService delegate, int maxConcurrent,
+ long acquireTimeout, TimeUnit timeUnit,
+ boolean fair, ThreadPoolRejectedPolicy
rejectedPolicy) {
+ this.delegate = delegate;
+ this.maxConcurrent = maxConcurrent;
+ this.semaphore = new Semaphore(maxConcurrent, fair);
+ this.timeoutNanos = timeUnit.toNanos(acquireTimeout);
+ this.rejectedPolicy = rejectedPolicy;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (delegate.isShutdown()) {
+ throw new RejectedExecutionException("Executor has been shut
down");
+ }
+
+ boolean acquired = false;
+ try {
+ if (rejectedPolicy == ThreadPoolRejectedPolicy.Block) {
+ semaphore.acquire();
+ acquired = true;
+ } else {
+ acquired = semaphore.tryAcquire(timeoutNanos,
TimeUnit.NANOSECONDS);
+ }
+
+ if (!acquired) {
+ if (rejectedPolicy == ThreadPoolRejectedPolicy.CallerRuns) {
+ callerRunsCount.increment();
+ command.run();
+ return;
+ }
+ rejectedCount.increment();
+ throw new RejectedExecutionException("Executor saturated:
timed out waiting for a permit");
+ }
+
+ boolean submitted = false;
+ try {
+ delegate.execute(() -> {
+ try {
+ command.run();
+ } finally {
+ delegatedTaskCount.increment();
+ semaphore.release();
+ }
+ });
+ submitted = true;
+ } finally {
+ if (!submitted) {
+ semaphore.release();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RejectedExecutionException("Interrupted while waiting
for permit", e);
+ }
+ }
+
+ // -- Metrics --
+
+ /**
+ * The maximum number of tasks that can be delegated to the underlying
executor concurrently. CallerRuns tasks
+ * execute outside this limit.
+ */
+ public int getMaxConcurrent() {
+ return maxConcurrent;
+ }
+
+ /**
+ * The number of permits currently available.
+ */
+ public int getAvailablePermits() {
+ return semaphore.availablePermits();
+ }
+
+ /**
+ * The number of tasks currently delegated to the underlying executor.
+ */
+ public int getActiveCount() {
+ return maxConcurrent - semaphore.availablePermits();
+ }
+
+ /**
+ * The number of threads currently blocked waiting for a permit.
+ */
+ public int getWaitingCount() {
+ return semaphore.getQueueLength();
+ }
+
+ /**
+ * The number of times the timeout expired and a task fell back to running
on the caller's thread.
+ */
+ public long getCallerRunsCount() {
+ return callerRunsCount.sum();
+ }
+
+ /**
+ * The number of tasks rejected because no permit was available within the
timeout.
+ */
+ public long getRejectedCount() {
+ return rejectedCount.sum();
+ }
+
+ /**
+ * The total number of tasks that completed via the underlying executor
(excludes caller-runs).
+ */
+ public long getDelegatedTaskCount() {
+ return delegatedTaskCount.sum();
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ if (runnable instanceof Rejectable) {
+ return new RejectableFutureTask<>(runnable, value);
+ }
+ return super.newTaskFor(runnable, value);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ if (callable instanceof Rejectable) {
+ return new RejectableFutureTask<>(callable);
+ }
+ return super.newTaskFor(callable);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public String toString() {
+ return "BoundedExecutorService[active=" + getActiveCount()
+ + ", max=" + maxConcurrent
+ + ", waiting=" + getWaitingCount()
+ + ", callerRuns=" + callerRunsCount.sum()
+ + ", rejected=" + rejectedCount.sum()
+ + ", delegated=" + delegatedTaskCount.sum() + "]";
+ }
+}
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
index 1d0f47a5bb09..113ac74f69de 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
@@ -31,7 +31,8 @@ import java.util.concurrent.ThreadPoolExecutor;
public enum ThreadPoolRejectedPolicy {
Abort,
- CallerRuns;
+ CallerRuns,
+ Block;
public RejectedExecutionHandler asRejectedExecutionHandler() {
if (this == Abort) {
@@ -57,6 +58,25 @@ public enum ThreadPoolRejectedPolicy {
return "CallerRuns";
}
};
+ } else if (this == Block) {
+ return new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
+ if (!executor.isShutdown()) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RejectedExecutionException("Interrupted
while waiting for queue space", e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Block";
+ }
+ };
}
throw new IllegalArgumentException("Unknown ThreadPoolRejectedPolicy:
" + this);
}
diff --git
a/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java
b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java
new file mode 100644
index 000000000000..1b58012f35d9
--- /dev/null
+++
b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BoundedExecutorServiceTest {
+
+ @Test
+ public void testCallerRunsOnTimeout() throws Exception {
+ var delegate = Executors.newCachedThreadPool();
+ var sized = new BoundedExecutorService(
+ delegate, 1, 200, TimeUnit.MILLISECONDS, false,
ThreadPoolRejectedPolicy.CallerRuns);
+ try {
+ CountDownLatch blockTask = new CountDownLatch(1);
+ sized.execute(() -> {
+ try {
+ blockTask.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ AtomicBoolean ranOnCallerThread = new AtomicBoolean();
+ String callerName = Thread.currentThread().getName();
+
+ sized.execute(() -> ranOnCallerThread
+ .set(Thread.currentThread().getName().equals(callerName)));
+
+ assertTrue(ranOnCallerThread.get(),
+ "After timeout, task should run on the caller's thread");
+ assertEquals(1, sized.getCallerRunsCount());
+ assertEquals(0, sized.getRejectedCount());
+
+ blockTask.countDown();
+ } finally {
+ sized.shutdown();
+ delegate.shutdown();
+ }
+ }
+
+ @Test
+ public void testAbortOnTimeout() throws Exception {
+ var delegate = Executors.newCachedThreadPool();
+ var sized = new BoundedExecutorService(
+ delegate, 1, 200, TimeUnit.MILLISECONDS, false,
ThreadPoolRejectedPolicy.Abort);
+ try {
+ CountDownLatch blockTask = new CountDownLatch(1);
+ sized.execute(() -> {
+ try {
+ blockTask.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ assertThrows(RejectedExecutionException.class,
+ () -> sized.execute(() -> {
+ }),
+ "Should reject after timeout with Abort policy");
+ assertEquals(0, sized.getCallerRunsCount());
+ assertEquals(1, sized.getRejectedCount());
+
+ blockTask.countDown();
+ } finally {
+ sized.shutdown();
+ delegate.shutdown();
+ }
+ }
+
+ @Test
+ public void testBlockForeverPolicy() throws Exception {
+ var delegate = Executors.newCachedThreadPool();
+ var sized = new BoundedExecutorService(
+ delegate, 1, 60, TimeUnit.SECONDS, false,
ThreadPoolRejectedPolicy.Block);
+ try {
+ CountDownLatch blockTask = new CountDownLatch(1);
+ CountDownLatch secondStarted = new CountDownLatch(1);
+ CountDownLatch submitterBlocked = new CountDownLatch(1);
+
+ sized.execute(() -> {
+ try {
+ blockTask.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ Thread submitter = new Thread(() -> {
+ submitterBlocked.countDown();
+ sized.execute(secondStarted::countDown);
+ });
+ submitter.start();
+
+ assertTrue(submitterBlocked.await(5, TimeUnit.SECONDS));
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertTrue(sized.getWaitingCount() >
0,
+ "Submitter should be blocked waiting for a
permit"));
+
+ blockTask.countDown();
+
+ assertTrue(secondStarted.await(5, TimeUnit.SECONDS),
+ "Second task should run after first completes");
+ submitter.join(5000);
+ } finally {
+ sized.shutdown();
+ delegate.shutdown();
+ }
+ }
+
+ @Test
+ public void testConcurrencyBounded() throws Exception {
+ var delegate = Executors.newCachedThreadPool();
+ int maxConcurrent = 3;
+ var sized = new BoundedExecutorService(
+ delegate, maxConcurrent, 60, TimeUnit.SECONDS, false,
ThreadPoolRejectedPolicy.Block);
+ try {
+ AtomicInteger inFlight = new AtomicInteger();
+ AtomicInteger peak = new AtomicInteger();
+ int totalTasks = 20;
+ CountDownLatch holdTasks = new CountDownLatch(1);
+ CountDownLatch allDone = new CountDownLatch(totalTasks);
+
+ ExecutorService senders = Executors.newFixedThreadPool(totalTasks);
+ for (int i = 0; i < totalTasks; i++) {
+ senders.submit(() -> sized.execute(() -> {
+ int current = inFlight.incrementAndGet();
+ peak.accumulateAndGet(current, Math::max);
+ try {
+ holdTasks.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ inFlight.decrementAndGet();
+ allDone.countDown();
+ }));
+ }
+
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(maxConcurrent,
sized.getActiveCount()));
+
+ holdTasks.countDown();
+ assertTrue(allDone.await(30, TimeUnit.SECONDS), "All tasks should
complete");
+ assertTrue(peak.get() <= maxConcurrent,
+ "Peak concurrency (" + peak.get() + ") should be <= " +
maxConcurrent);
+ senders.shutdown();
+ } finally {
+ sized.shutdown();
+ delegate.shutdown();
+ }
+ }
+
+ @Test
+ public void testPermitsReleasedAfterCompletion() throws Exception {
+ var delegate = Executors.newCachedThreadPool();
+ var sized = new BoundedExecutorService(
+ delegate, 2, 60, TimeUnit.SECONDS, false,
ThreadPoolRejectedPolicy.Block);
+ try {
+ CountDownLatch firstBatch = new CountDownLatch(2);
+ for (int i = 0; i < 2; i++) {
+ sized.execute(firstBatch::countDown);
+ }
+ assertTrue(firstBatch.await(5, TimeUnit.SECONDS), "First batch
should complete");
+
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(2,
sized.getAvailablePermits()));
+
+ CountDownLatch secondBatch = new CountDownLatch(2);
+ for (int i = 0; i < 2; i++) {
+ sized.execute(secondBatch::countDown);
+ }
+ assertTrue(secondBatch.await(5, TimeUnit.SECONDS),
+ "Second batch should succeed after permits are released");
+ } finally {
+ sized.shutdown();
+ delegate.shutdown();
+ }
+ }
+
+ @Test
+ public void testSubmitAfterShutdown() {
+ var delegate = Executors.newCachedThreadPool();
+ var sized = new BoundedExecutorService(
+ delegate, 5, 60, TimeUnit.SECONDS, false,
ThreadPoolRejectedPolicy.CallerRuns);
+
+ sized.shutdown();
+ assertTrue(sized.isShutdown());
+ assertTrue(delegate.isShutdown());
+
+ assertThrows(RejectedExecutionException.class,
+ () -> sized.execute(() -> {
+ }),
+ "Should reject after shutdown");
+ }
+
+ @Test
+ public void testSubmitReturnsFuture() throws Exception {
+ var delegate = Executors.newCachedThreadPool();
+ var sized = new BoundedExecutorService(
+ delegate, 5, 60, TimeUnit.SECONDS, false,
ThreadPoolRejectedPolicy.CallerRuns);
+ try {
+ AtomicBoolean executed = new AtomicBoolean();
+ Future<?> future = sized.submit(() -> executed.set(true));
+
+ future.get(5, TimeUnit.SECONDS);
+ assertTrue(executed.get(), "Task submitted via submit() should
execute");
+ assertTrue(future.isDone());
+ } finally {
+ sized.shutdown();
+ delegate.shutdown();
+ }
+ }
+}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
index b2a8f57fddf4..da3bd2aae50e 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
@@ -28,6 +28,21 @@ your own code or tooling, add `org.jspecify:jspecify`
explicitly to your project
The `org.apache.camel.support.DefaultHeaderFilterStrategy` changed default
setting for lowercase from `false` to `true`.
+==== Virtual threads: `maxQueueSize` now honored in `threads()` EIP
+
+When virtual threads are enabled (`camel.threads.virtual.enabled=true`), the
`threads()` EIP now honors `maxQueueSize`
+for backpressure. Previously, `maxQueueSize` was silently ignored and tasks
were accepted unboundedly.
+
+The virtual thread executor is wrapped with a semaphore-based concurrency
limit (`BoundedExecutorService`) that enforces
+a flat cap of `maxQueueSize` on delegated tasks. The `keepAliveTime` parameter
is repurposed as the
+semaphore acquisition timeout (pool sizing parameters are not applicable to
virtual threads).
+
+==== New `Block` rejected policy
+
+A new `Block` value has been added to `ThreadPoolRejectedPolicy`. With
`Block`, the caller blocks indefinitely until
+capacity becomes available. This applies to both platform and virtual threads.
The existing `CallerRuns` (default) and
+`Abort` policies are unchanged.
+
The type converters for Java serialized objects with types
`java.io.ObjectInput` and `java.io.ObjectOutput` has been removed.
Java object serialization is a recurring source of security issues and
therefore these converters has been removed.
These converters are not used at all by Camel itself. To restore compatibility
then end users can add these type converters back as custom converters in their
own Camel applications.
@@ -67,6 +82,26 @@ auto-disables `contentCache` on resource-based components
(such as `xslt`) whose
the route. Set `camel.component.<name>.contentCache=true` (or pass
`?contentCache=true` on the
URI) to opt back in to caching during dev mode.
+==== Unified `--packaging` option for `camel export`
+
+A new `--packaging` option has been added to `camel export` that works across
all three runtimes
+(Camel Main, Spring Boot, and Quarkus). It replaces the Quarkus-specific
`--quarkus-package-type`
+option, which is now deprecated.
+
+Accepted values:
+
+- `layered` or `fast-jar` — container-optimized packaging with separate
dependency layers (default)
+- `fat-jar` or `uber-jar` — single executable JAR
+
+The default is `layered`, which produces Dockerfiles optimized for container
image layer caching.
+Each runtime implements layered packaging using its native mechanism:
+
+- **Camel Main**: thin JAR with dependencies in a `lib/` folder
+- **Spring Boot**: multi-stage Dockerfile using Spring Boot's built-in layer
extraction
+- **Quarkus**: fast-jar packaging (unchanged from the previous
`--quarkus-package-type=fast-jar` default)
+
+The deprecated `--quarkus-package-type` option continues to work for backward
compatibility.
+
==== Improved default `--quarkus-version`
The default behavior of commands requiring Quarkus Platform version has
improved.
diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc
b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
index fb2afc1ed5b5..b419a3a1410a 100644
--- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc
+++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
@@ -28,8 +28,8 @@ The default profile is pre-configured out of the box with the
following settings
| *maxPoolSize* | `20` | Sets the default maximum pool size
| *maxQueueSize* | `1000` | Sets the default maximum number of tasks in the
work queue. Use -1 for an unbounded queue.
| *allowCoreThreadTimeOut* | `true` | Sets default whether to allow core
threads to timeout
-| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which
cannot be executed by the thread pool. Has four options:
-`Abort, CallerRuns, Discard, DiscardOldest` which corresponds to the same four
options provided out of the box in the JDK.
+| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which
cannot be executed by the thread pool. Has three options:
+`Abort`, `CallerRuns`, `Block`. See <<rejected-policy>> for details.
|===
What that means is that for example when you use
@@ -310,15 +310,63 @@ To hook in custom thread pool providers a
`ThreadPoolFactory` interface can be implemented. The implementation can
be set in the `ExecutorServiceManager`.
+[[rejected-policy]]
+== Rejected Policy
+
+The `rejectedPolicy` option controls what happens when a thread pool cannot
accept a new task
+(i.e., the pool and its work queue are full). The available policies are:
+
+[width="100%",cols="20%,80%",options="header"]
+|===
+| Policy | Description
+| *CallerRuns* | The task runs on the caller's thread. This provides natural
backpressure — the caller is blocked
+doing useful work and cannot submit more tasks until it finishes. Tasks are
never lost. This is the default.
+| *Abort* | The task is rejected with a `RejectedExecutionException`. Use this
for HTTP APIs or latency-sensitive
+systems where failing fast is preferred over blocking.
+| *Block* | The caller blocks indefinitely until capacity becomes available.
No timeout, no rejection. Use this for
+message broker consumers and batch workloads where losing a task is
unacceptable and latency is less critical.
+|===
+
+With platform threads, these policies apply when the `ThreadPoolExecutor` work
queue is full. With virtual threads,
+the same policies apply when the concurrency semaphore has no available
permits (see <<virtual-threads>>).
+
+[[virtual-threads]]
== Virtual Threads
Starting from Java 21, the default `ThreadPoolFactory` can build
`ExecutorService` and `ScheduledExecutorService` that
-use https://openjdk.org/jeps/425[virtual threads] instead of platform threads.
+use https://openjdk.org/jeps/444[virtual threads] instead of platform threads.
-But as it is an experimental feature, it is not enabled by default, you need
to set the System property `camel.threads.virtual.enabled`
-to `true` and run Camel using Java 21 or above to enable it.
+To enable virtual threads, set the System property
`camel.threads.virtual.enabled` to `true` and run Camel using
+Java 21 or above.
Be aware that even if it is enabled, there are some use cases where platform
threads are still used, for example, if the
thread factory is configured to create non-daemon threads since virtual
threads can only be daemons, or when the
`ExecutorService` or `ScheduledExecutorService` to build cannot have more than
one thread or finally when `corePoolSize`
is set to zero and `maxQueueSize` is set to a value less or equal to `0`.
+
+=== Bounded Concurrency with Virtual Threads
+
+When `maxQueueSize` is set to a positive value, Camel wraps the virtual thread
executor with a semaphore-based
+concurrency limit. This ensures that `maxQueueSize` is honored for
backpressure, even though virtual threads do not
+use a traditional work queue.
+
+Unlike `ThreadPoolExecutor` where pool threads and queued tasks are distinct
concepts, the virtual thread executor
+enforces a flat concurrency cap: the maximum number of concurrently executing
tasks equals `maxQueueSize`.
+Pool sizing parameters (`poolSize`, `maxPoolSize`) are ignored since virtual
threads are not pooled.
+All permitted tasks execute immediately on virtual threads — there is no queue
of waiting tasks.
+
+The `rejectedPolicy` controls what happens when the concurrency limit is
reached:
+
+* *CallerRuns* (default): the caller blocks up to `keepAliveTime` waiting for
a permit. If the timeout expires,
+the task runs on the caller's thread.
+* *Abort*: the caller blocks up to `keepAliveTime` waiting for a permit. If
the timeout expires,
+a `RejectedExecutionException` is thrown.
+* *Block*: the caller blocks indefinitely until a permit becomes available.
+
+While waiting for a permit, the calling thread is blocked. When callers are
virtual threads this is inexpensive
+(the carrier thread is released). When callers are platform threads (e.g.,
HTTP server threads) the blocked thread
+is unavailable for other work.
+
+Pool sizing parameters (`poolSize`, `maxPoolSize`, `keepAliveTime`) do not
control thread reuse with virtual threads
+since virtual threads are cheap to create and are never pooled. However,
`keepAliveTime` is reused as the timeout
+for permit acquisition with `CallerRuns` and `Abort` policies.
diff --git a/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
b/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
index 06d8501a018e..0991ff14b7b4 100644
--- a/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
+++ b/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
@@ -91,15 +91,24 @@ When virtual threads are enabled, Camel's
`DefaultThreadPoolFactory` (JDK 21+ va
| `Executors.newCachedThreadPool()`
| `Executors.newThreadPerTaskExecutor()`
-| `newThreadPool()` (poolSize > 1)
-| `ThreadPoolExecutor`
-| `Executors.newThreadPerTaskExecutor()`
+| `newThreadPool()` (maxQueueSize > 0)
+| `ThreadPoolExecutor` with bounded queue
+| `Executors.newThreadPerTaskExecutor()` wrapped with semaphore-based
concurrency limit
+
+| `newThreadPool()` (maxQueueSize ≤ 0)
+| `ThreadPoolExecutor` with `SynchronousQueue`
+| `Executors.newThreadPerTaskExecutor()` (unbounded)
| `newScheduledThreadPool()`
| `ScheduledThreadPoolExecutor`
| `Executors.newScheduledThreadPool(0, factory)`
|===
+When `maxQueueSize` is set to a positive value, the virtual thread executor is
wrapped with a semaphore that enforces
+a flat concurrency cap of `maxQueueSize`. Unlike `ThreadPoolExecutor` where
pool threads and queued tasks
+are distinct, all permitted tasks execute immediately on virtual threads. The
`rejectedPolicy` controls what happens
+when the concurrency limit is reached — see
xref:threading-model.adoc#rejected-policy[Rejected Policy] for details.
+
[NOTE]
====
Single-threaded executors and scheduled tasks still use platform threads, as
virtual threads are optimized for concurrent I/O-bound work, not scheduled or
sequential tasks.
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 4d3797581b51..45bf777d70f5 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -17646,7 +17646,7 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
@YamlProperty(name = "maxQueueSize", type = "number",
description = "Sets the maximum number of tasks in the work queue. Use -1 or
Integer.MAX_VALUE for an unbounded queue", displayName = "Max Queue Size"),
@YamlProperty(name = "note", type = "string", description
= "Sets the note of this node", displayName = "Note"),
@YamlProperty(name = "poolSize", type = "number",
description = "Sets the core pool size", displayName = "Pool Size"),
- @YamlProperty(name = "rejectedPolicy", type =
"enum:Abort,CallerRuns", description = "Sets the handler for tasks which cannot
be executed by the thread pool.", displayName = "Rejected Policy"),
+ @YamlProperty(name = "rejectedPolicy", type =
"enum:Abort,CallerRuns,Block", description = "Sets the handler for tasks which
cannot be executed by the thread pool.", displayName = "Rejected Policy"),
@YamlProperty(name = "timeUnit", type =
"enum:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS",
description = "Sets the time unit to use for keep alive time By default SECONDS
is used.", displayName = "Time Unit")
}
)
@@ -17747,7 +17747,7 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
@YamlProperty(name = "maxQueueSize", type = "number",
description = "Sets the maximum number of tasks in the work queue. Use -1 or
Integer.MAX_VALUE for an unbounded queue", displayName = "Max Queue Size"),
@YamlProperty(name = "note", type = "string", description
= "Sets the note of this node", displayName = "Note"),
@YamlProperty(name = "poolSize", type = "number",
description = "Sets the core pool size", displayName = "Pool Size"),
- @YamlProperty(name = "rejectedPolicy", type =
"enum:Abort,CallerRuns", description = "Sets the handler for tasks which cannot
be executed by the thread pool.", displayName = "Rejected Policy"),
+ @YamlProperty(name = "rejectedPolicy", type =
"enum:Abort,CallerRuns,Block", description = "Sets the handler for tasks which
cannot be executed by the thread pool.", displayName = "Rejected Policy"),
@YamlProperty(name = "threadName", type = "string",
defaultValue = "Threads", description = "Sets the thread name to use.",
displayName = "Thread Name"),
@YamlProperty(name = "timeUnit", type =
"enum:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS",
description = "Sets the keep alive time unit. By default SECONDS is used.",
displayName = "Time Unit")
}
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
index eaaeefe3187f..8438e7a74b37 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
@@ -4842,7 +4842,7 @@
"type" : "string",
"title" : "Rejected Policy",
"description" : "Sets the handler for tasks which cannot be
executed by the thread pool.",
- "enum" : [ "Abort", "CallerRuns" ]
+ "enum" : [ "Abort", "CallerRuns", "Block" ]
},
"timeUnit" : {
"type" : "string",
@@ -4920,7 +4920,7 @@
"type" : "string",
"title" : "Rejected Policy",
"description" : "Sets the handler for tasks which cannot be
executed by the thread pool.",
- "enum" : [ "Abort", "CallerRuns" ]
+ "enum" : [ "Abort", "CallerRuns", "Block" ]
},
"threadName" : {
"type" : "string",
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index 26fe66fb3b61..9baa494c457e 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -7528,7 +7528,7 @@
"type" : "string",
"title" : "Rejected Policy",
"description" : "Sets the handler for tasks which cannot be
executed by the thread pool.",
- "enum" : [ "Abort", "CallerRuns" ]
+ "enum" : [ "Abort", "CallerRuns", "Block" ]
},
"timeUnit" : {
"type" : "string",
@@ -7606,7 +7606,7 @@
"type" : "string",
"title" : "Rejected Policy",
"description" : "Sets the handler for tasks which cannot be
executed by the thread pool.",
- "enum" : [ "Abort", "CallerRuns" ]
+ "enum" : [ "Abort", "CallerRuns", "Block" ]
},
"threadName" : {
"type" : "string",