This is an automated email from the ASF dual-hosted git repository.

Croway pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f416cfe4101f CAMEL-23602: Honor maxQueueSize in threads EIP with 
virtual threads
f416cfe4101f is described below

commit f416cfe4101f372f62ce6070e6a508691d740956
Author: Federico Mariani <[email protected]>
AuthorDate: Mon May 25 09:44:14 2026 +0200

    CAMEL-23602: Honor maxQueueSize in threads EIP with virtual threads
---
 .../camel/catalog/models/threadPoolProfile.json    |   2 +-
 .../org/apache/camel/catalog/models/threads.json   |   2 +-
 .../org/apache/camel/spring/xml/threadPool.json    |   2 +-
 .../org/apache/camel/model/threadPoolProfile.json  |   2 +-
 .../META-INF/org/apache/camel/model/threads.json   |   2 +-
 .../camel/model/ThreadPoolProfileDefinition.java   |   2 +-
 .../org/apache/camel/model/ThreadsDefinition.java  |   2 +-
 .../xml/AbstractCamelThreadPoolFactoryBean.java    |   2 +-
 .../camel/support/DefaultThreadPoolFactory.java    |  17 +-
 core/camel-util/pom.xml                            |   6 +
 .../util/concurrent/BoundedExecutorService.java    | 232 ++++++++++++++++++++
 .../util/concurrent/ThreadPoolRejectedPolicy.java  |  22 +-
 .../concurrent/BoundedExecutorServiceTest.java     | 241 +++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_21.adoc    |  35 +++
 .../modules/ROOT/pages/threading-model.adoc        |  58 ++++-
 .../modules/ROOT/pages/virtual-threads.adoc        |  15 +-
 .../dsl/yaml/deserializers/ModelDeserializers.java |   4 +-
 .../resources/schema/camelYamlDsl-canonical.json   |   4 +-
 .../generated/resources/schema/camelYamlDsl.json   |   4 +-
 19 files changed, 630 insertions(+), 24 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
index 152d7a9935c1..fa8a0a18b9b4 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json
@@ -22,6 +22,6 @@
     "timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit", 
"group": "advanced", "label": "advanced", "required": false, "type": "enum", 
"javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", 
"MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the time unit to use for keep alive time By default SECONDS is used." },
     "maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max 
Queue Size", "group": "common", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the maximum number of tasks in the work 
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
     "allowCoreThreadTimeOut": { "index": 9, "kind": "attribute", 
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether idle core threads is allowed to 
timeout and therefore can shrink the pool size below the core pool size Is by 
default true" },
-    "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the handler for tasks which cannot be executed by the 
thread pool." }
+    "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": 
false, "description": "Sets the handler for tasks which cannot be executed by 
the thread pool." }
   }
 }
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
index aae4d04db0d8..cb1c9325c8ee 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json
@@ -24,7 +24,7 @@
     "maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max 
Queue Size", "group": "common", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the maximum number of tasks in the work 
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
     "allowCoreThreadTimeOut": { "index": 10, "kind": "attribute", 
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether idle core threads are allowed to 
timeout and therefore can shrink the pool size below the core pool size Is by 
default false" },
     "threadName": { "index": 11, "kind": "attribute", "displayName": "Thread 
Name", "group": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "Threads", "description": "Sets the thread name to use." },
-    "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the handler for tasks which cannot be executed by the 
thread pool." },
+    "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": 
false, "description": "Sets the handler for tasks which cannot be executed by 
the thread pool." },
     "callerRunsWhenRejected": { "index": 13, "kind": "attribute", 
"displayName": "Caller Runs When Rejected", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "description": "Whether or not to use as caller runs as 
fallback when a task is rejected being added to the thread pool (when its 
full). This is only used as fallback if no rejectedPolicy h [...]
   }
 }
diff --git 
a/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
 
b/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
index ac99539187c9..46284132427c 100644
--- 
a/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
+++ 
b/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json
@@ -19,7 +19,7 @@
     "timeUnit": { "index": 4, "kind": "attribute", "displayName": "Time Unit", 
"group": "common", "required": false, "type": "enum", "javaType": 
"java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", 
"MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "SECONDS", "description": 
"Sets the time unit used for keep alive time" },
     "maxQueueSize": { "index": 5, "kind": "attribute", "displayName": "Max 
Queue Size", "group": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the maximum number of tasks in the work 
queue. Use -1 for an unbounded queue" },
     "allowCoreThreadTimeOut": { "index": 6, "kind": "attribute", 
"displayName": "Allow Core Thread Time Out", "group": "common", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Sets whether to allow core threads to timeout" },
-    "rejectedPolicy": { "index": 7, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "common", "required": false, "type": "enum", 
"javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", 
"enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "CallerRuns", "description": "Sets the handler 
for tasks which cannot be executed by the thread pool." },
+    "rejectedPolicy": { "index": 7, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "common", "required": false, "type": "enum", 
"javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", 
"enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "CallerRuns", "description": "Sets the 
handler for tasks which cannot be executed by the thread pool." },
     "threadName": { "index": 8, "kind": "attribute", "displayName": "Thread 
Name", "group": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "To use a custom thread name \/ pattern" },
     "scheduled": { "index": 9, "kind": "attribute", "displayName": 
"Scheduled", "group": "common", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether to use a 
scheduled thread pool" },
     "camelContextId": { "index": 10, "kind": "attribute", "displayName": 
"Camel Context Id", "group": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Id of CamelContext to use if there are 
multiple CamelContexts in the same JVM" }
diff --git 
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
 
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
index 152d7a9935c1..fa8a0a18b9b4 100644
--- 
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
+++ 
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json
@@ -22,6 +22,6 @@
     "timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit", 
"group": "advanced", "label": "advanced", "required": false, "type": "enum", 
"javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", 
"MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the time unit to use for keep alive time By default SECONDS is used." },
     "maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max 
Queue Size", "group": "common", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the maximum number of tasks in the work 
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
     "allowCoreThreadTimeOut": { "index": 9, "kind": "attribute", 
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether idle core threads is allowed to 
timeout and therefore can shrink the pool size below the core pool size Is by 
default true" },
-    "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the handler for tasks which cannot be executed by the 
thread pool." }
+    "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": 
false, "description": "Sets the handler for tasks which cannot be executed by 
the thread pool." }
   }
 }
diff --git 
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
 
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
index aae4d04db0d8..cb1c9325c8ee 100644
--- 
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
+++ 
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json
@@ -24,7 +24,7 @@
     "maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max 
Queue Size", "group": "common", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the maximum number of tasks in the work 
queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
     "allowCoreThreadTimeOut": { "index": 10, "kind": "attribute", 
"displayName": "Allow Core Thread Time Out", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether idle core threads are allowed to 
timeout and therefore can shrink the pool size below the core pool size Is by 
default false" },
     "threadName": { "index": 11, "kind": "attribute", "displayName": "Thread 
Name", "group": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "Threads", "description": "Sets the thread name to use." },
-    "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the handler for tasks which cannot be executed by the 
thread pool." },
+    "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": 
"Rejected Policy", "group": "advanced", "label": "advanced", "required": false, 
"type": "enum", "javaType": 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", 
"CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": 
false, "description": "Sets the handler for tasks which cannot be executed by 
the thread pool." },
     "callerRunsWhenRejected": { "index": 13, "kind": "attribute", 
"displayName": "Caller Runs When Rejected", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "description": "Whether or not to use as caller runs as 
fallback when a task is rejected being added to the thread pool (when its 
full). This is only used as fallback if no rejectedPolicy h [...]
   }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
index 50533e929eef..1c26ee020cdf 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
@@ -58,7 +58,7 @@ public class ThreadPoolProfileDefinition extends 
OptionalIdentifiedDefinition<Th
     private String allowCoreThreadTimeOut;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
-              enums = "Abort,CallerRuns")
+              enums = "Abort,CallerRuns,Block")
     private String rejectedPolicy;
 
     public ThreadPoolProfileDefinition() {
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
index 92df53d49d0e..c8827b4e9bf4 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadsDefinition.java
@@ -67,7 +67,7 @@ public class ThreadsDefinition extends 
NoOutputDefinition<ThreadsDefinition>
     private String threadName;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
-              enums = "Abort,CallerRuns")
+              enums = "Abort,CallerRuns,Block")
     private String rejectedPolicy;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue 
= "true")
diff --git 
a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
 
b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
index 4b73caaa1abf..a33721b1b4d6 100644
--- 
a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
+++ 
b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java
@@ -58,7 +58,7 @@ public abstract class AbstractCamelThreadPoolFactoryBean 
extends AbstractCamelFa
     @XmlAttribute
     @Metadata(description = "Sets the handler for tasks which cannot be 
executed by the thread pool.",
               defaultValue = "CallerRuns", javaType = 
"org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
-              enums = "Abort,CallerRuns")
+              enums = "Abort,CallerRuns,Block")
     private String rejectedPolicy = ThreadPoolRejectedPolicy.CallerRuns.name();
     @XmlAttribute(required = true)
     @Metadata(description = "To use a custom thread name / pattern")
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
index 70e6f5fb5c50..33110ef99690 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -34,10 +34,12 @@ import org.apache.camel.StaticService;
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.BoundedExecutorService;
 import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
 import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
 import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
+import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.apache.camel.util.concurrent.ThreadType;
 
 /**
@@ -64,6 +66,19 @@ public class DefaultThreadPoolFactory extends ServiceSupport 
implements CamelCon
 
     @Override
     public ExecutorService newThreadPool(ThreadPoolProfile profile, 
ThreadFactory factory) {
+        // Virtual threads: use the policy enum directly from the profile to 
avoid reverse-mapping
+        if (profile.getMaxQueueSize() > 0
+                && ThreadPoolFactoryType.from(factory, profile) == 
ThreadPoolFactoryType.VIRTUAL) {
+            ThreadPoolRejectedPolicy policy = profile.getRejectedPolicy();
+            if (policy == null) {
+                policy = ThreadPoolRejectedPolicy.CallerRuns;
+            }
+            return new BoundedExecutorService(
+                    ThreadPoolFactoryType.newThreadPerTaskExecutor(factory),
+                    profile.getMaxQueueSize(),
+                    profile.getKeepAliveTime(), profile.getTimeUnit(),
+                    false, policy);
+        }
         // allow core thread timeout is default true if not configured
         boolean allow = profile.getAllowCoreThreadTimeOut() != null ? 
profile.getAllowCoreThreadTimeOut() : true;
         return newThreadPool(profile.getPoolSize(),
@@ -199,7 +214,7 @@ public class DefaultThreadPoolFactory extends 
ServiceSupport implements CamelCon
         }
 
         @SuppressWarnings("unchecked")
-        private static ExecutorService newThreadPerTaskExecutor(ThreadFactory 
threadFactory) {
+        static ExecutorService newThreadPerTaskExecutor(ThreadFactory 
threadFactory) {
             try {
                 return (ExecutorService) Executors.class
                         .getMethod("newThreadPerTaskExecutor", 
ThreadFactory.class)
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 41a1bed2bcc9..7da60eb85dfb 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -51,6 +51,12 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility-version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java
 
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java
new file mode 100644
index 000000000000..6111377dcce5
--- /dev/null
+++ 
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * An {@link ExecutorService} wrapper that enforces bounded concurrency via a 
{@link Semaphore}.
+ * <p>
+ * When virtual threads are enabled, Camel replaces the traditional {@link 
java.util.concurrent.ThreadPoolExecutor} with
+ * {@code Executors.newThreadPerTaskExecutor()}, which accepts every task 
immediately (unbounded). This wrapper limits
+ * the maximum number of tasks delegated to the underlying executor. Unlike 
{@code ThreadPoolExecutor} there is no
+ * distinction between pool threads and queued tasks — the semaphore enforces 
a flat concurrency cap on delegated tasks.
+ * <p>
+ * When the semaphore has no available permits, behavior depends on the 
configured {@link ThreadPoolRejectedPolicy}:
+ * <ul>
+ * <li><b>CallerRuns</b> (default): blocks until a permit is available or the 
timeout expires; on timeout, runs the task
+ * on the caller's thread. Tasks are never lost. Note that caller-run tasks 
execute outside semaphore accounting, so
+ * total system concurrency may temporarily exceed {@code maxConcurrent}.</li>
+ * <li><b>Abort</b>: blocks until a permit is available or the timeout 
expires; on timeout, throws
+ * {@link RejectedExecutionException}.</li>
+ * <li><b>Block</b>: blocks indefinitely until a permit becomes available. No 
timeout, no rejection.</li>
+ * </ul>
+ * <p>
+ * <b>Caller thread blocking:</b> while waiting for a permit, the calling 
thread is blocked. When callers are virtual
+ * threads this is inexpensive (the carrier thread is released). When callers 
are platform threads (e.g., HTTP server
+ * threads) the blocked thread is unavailable for other work — this is 
standard backpressure behavior but worth noting
+ * for capacity planning.
+ *
+ */
+public class BoundedExecutorService extends AbstractExecutorService {
+
+    private final ExecutorService delegate;
+    private final Semaphore semaphore;
+    private final int maxConcurrent;
+    private final long timeoutNanos;
+    private final ThreadPoolRejectedPolicy rejectedPolicy;
+    private final LongAdder callerRunsCount = new LongAdder();
+    private final LongAdder rejectedCount = new LongAdder();
+    private final LongAdder delegatedTaskCount = new LongAdder();
+
+    /**
+     * @param delegate       the underlying executor (typically {@code 
newThreadPerTaskExecutor})
+     * @param maxConcurrent  the maximum number of tasks delegated to the 
underlying executor concurrently
+     * @param acquireTimeout the maximum time to wait for a permit (ignored 
when policy is {@code Block})
+     * @param timeUnit       the time unit for {@code acquireTimeout}
+     * @param fair           {@code true} for FIFO permit ordering 
(predictable latency), {@code false} for barging
+     *                       (higher throughput)
+     * @param rejectedPolicy the policy to apply when no permit is available
+     */
+    public BoundedExecutorService(ExecutorService delegate, int maxConcurrent,
+                                  long acquireTimeout, TimeUnit timeUnit,
+                                  boolean fair, ThreadPoolRejectedPolicy 
rejectedPolicy) {
+        this.delegate = delegate;
+        this.maxConcurrent = maxConcurrent;
+        this.semaphore = new Semaphore(maxConcurrent, fair);
+        this.timeoutNanos = timeUnit.toNanos(acquireTimeout);
+        this.rejectedPolicy = rejectedPolicy;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        if (delegate.isShutdown()) {
+            throw new RejectedExecutionException("Executor has been shut 
down");
+        }
+
+        boolean acquired = false;
+        try {
+            if (rejectedPolicy == ThreadPoolRejectedPolicy.Block) {
+                semaphore.acquire();
+                acquired = true;
+            } else {
+                acquired = semaphore.tryAcquire(timeoutNanos, 
TimeUnit.NANOSECONDS);
+            }
+
+            if (!acquired) {
+                if (rejectedPolicy == ThreadPoolRejectedPolicy.CallerRuns) {
+                    callerRunsCount.increment();
+                    command.run();
+                    return;
+                }
+                rejectedCount.increment();
+                throw new RejectedExecutionException("Executor saturated: 
timed out waiting for a permit");
+            }
+
+            boolean submitted = false;
+            try {
+                delegate.execute(() -> {
+                    try {
+                        command.run();
+                    } finally {
+                        delegatedTaskCount.increment();
+                        semaphore.release();
+                    }
+                });
+                submitted = true;
+            } finally {
+                if (!submitted) {
+                    semaphore.release();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RejectedExecutionException("Interrupted while waiting 
for permit", e);
+        }
+    }
+
+    // -- Metrics --
+
+    /**
+     * The maximum number of tasks that can be delegated to the underlying 
executor concurrently. CallerRuns tasks
+     * execute outside this limit.
+     */
+    public int getMaxConcurrent() {
+        return maxConcurrent;
+    }
+
+    /**
+     * The number of permits currently available.
+     */
+    public int getAvailablePermits() {
+        return semaphore.availablePermits();
+    }
+
+    /**
+     * The number of tasks currently delegated to the underlying executor.
+     */
+    public int getActiveCount() {
+        return maxConcurrent - semaphore.availablePermits();
+    }
+
+    /**
+     * The number of threads currently blocked waiting for a permit.
+     */
+    public int getWaitingCount() {
+        return semaphore.getQueueLength();
+    }
+
+    /**
+     * The number of times the timeout expired and a task fell back to running 
on the caller's thread.
+     */
+    public long getCallerRunsCount() {
+        return callerRunsCount.sum();
+    }
+
+    /**
+     * The number of tasks rejected because no permit was available within the 
timeout.
+     */
+    public long getRejectedCount() {
+        return rejectedCount.sum();
+    }
+
+    /**
+     * The total number of tasks that completed via the underlying executor 
(excludes caller-runs).
+     */
+    public long getDelegatedTaskCount() {
+        return delegatedTaskCount.sum();
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        if (runnable instanceof Rejectable) {
+            return new RejectableFutureTask<>(runnable, value);
+        }
+        return super.newTaskFor(runnable, value);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+        if (callable instanceof Rejectable) {
+            return new RejectableFutureTask<>(callable);
+        }
+        return super.newTaskFor(callable);
+    }
+
+    @Override
+    public void shutdown() {
+        delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return delegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return delegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return delegate.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public String toString() {
+        return "BoundedExecutorService[active=" + getActiveCount()
+               + ", max=" + maxConcurrent
+               + ", waiting=" + getWaitingCount()
+               + ", callerRuns=" + callerRunsCount.sum()
+               + ", rejected=" + rejectedCount.sum()
+               + ", delegated=" + delegatedTaskCount.sum() + "]";
+    }
+}
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
 
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
index 1d0f47a5bb09..113ac74f69de 100644
--- 
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
+++ 
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java
@@ -31,7 +31,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 public enum ThreadPoolRejectedPolicy {
 
     Abort,
-    CallerRuns;
+    CallerRuns,
+    Block;
 
     public RejectedExecutionHandler asRejectedExecutionHandler() {
         if (this == Abort) {
@@ -57,6 +58,25 @@ public enum ThreadPoolRejectedPolicy {
                     return "CallerRuns";
                 }
             };
+        } else if (this == Block) {
+            return new RejectedExecutionHandler() {
+                @Override
+                public void rejectedExecution(Runnable r, ThreadPoolExecutor 
executor) {
+                    if (!executor.isShutdown()) {
+                        try {
+                            executor.getQueue().put(r);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RejectedExecutionException("Interrupted 
while waiting for queue space", e);
+                        }
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "Block";
+                }
+            };
         }
         throw new IllegalArgumentException("Unknown ThreadPoolRejectedPolicy: 
" + this);
     }
diff --git 
a/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java
 
b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java
new file mode 100644
index 000000000000..1b58012f35d9
--- /dev/null
+++ 
b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BoundedExecutorServiceTest {
+
+    @Test
+    public void testCallerRunsOnTimeout() throws Exception {
+        var delegate = Executors.newCachedThreadPool();
+        var sized = new BoundedExecutorService(
+                delegate, 1, 200, TimeUnit.MILLISECONDS, false, 
ThreadPoolRejectedPolicy.CallerRuns);
+        try {
+            CountDownLatch blockTask = new CountDownLatch(1);
+            sized.execute(() -> {
+                try {
+                    blockTask.await(10, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+
+            AtomicBoolean ranOnCallerThread = new AtomicBoolean();
+            String callerName = Thread.currentThread().getName();
+
+            sized.execute(() -> ranOnCallerThread
+                    .set(Thread.currentThread().getName().equals(callerName)));
+
+            assertTrue(ranOnCallerThread.get(),
+                    "After timeout, task should run on the caller's thread");
+            assertEquals(1, sized.getCallerRunsCount());
+            assertEquals(0, sized.getRejectedCount());
+
+            blockTask.countDown();
+        } finally {
+            sized.shutdown();
+            delegate.shutdown();
+        }
+    }
+
+    @Test
+    public void testAbortOnTimeout() throws Exception {
+        var delegate = Executors.newCachedThreadPool();
+        var sized = new BoundedExecutorService(
+                delegate, 1, 200, TimeUnit.MILLISECONDS, false, 
ThreadPoolRejectedPolicy.Abort);
+        try {
+            CountDownLatch blockTask = new CountDownLatch(1);
+            sized.execute(() -> {
+                try {
+                    blockTask.await(10, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+
+            assertThrows(RejectedExecutionException.class,
+                    () -> sized.execute(() -> {
+                    }),
+                    "Should reject after timeout with Abort policy");
+            assertEquals(0, sized.getCallerRunsCount());
+            assertEquals(1, sized.getRejectedCount());
+
+            blockTask.countDown();
+        } finally {
+            sized.shutdown();
+            delegate.shutdown();
+        }
+    }
+
+    @Test
+    public void testBlockForeverPolicy() throws Exception {
+        var delegate = Executors.newCachedThreadPool();
+        var sized = new BoundedExecutorService(
+                delegate, 1, 60, TimeUnit.SECONDS, false, 
ThreadPoolRejectedPolicy.Block);
+        try {
+            CountDownLatch blockTask = new CountDownLatch(1);
+            CountDownLatch secondStarted = new CountDownLatch(1);
+            CountDownLatch submitterBlocked = new CountDownLatch(1);
+
+            sized.execute(() -> {
+                try {
+                    blockTask.await(10, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+
+            Thread submitter = new Thread(() -> {
+                submitterBlocked.countDown();
+                sized.execute(secondStarted::countDown);
+            });
+            submitter.start();
+
+            assertTrue(submitterBlocked.await(5, TimeUnit.SECONDS));
+            await().atMost(5, TimeUnit.SECONDS)
+                    .untilAsserted(() -> assertTrue(sized.getWaitingCount() > 
0,
+                            "Submitter should be blocked waiting for a 
permit"));
+
+            blockTask.countDown();
+
+            assertTrue(secondStarted.await(5, TimeUnit.SECONDS),
+                    "Second task should run after first completes");
+            submitter.join(5000);
+        } finally {
+            sized.shutdown();
+            delegate.shutdown();
+        }
+    }
+
+    @Test
+    public void testConcurrencyBounded() throws Exception {
+        var delegate = Executors.newCachedThreadPool();
+        int maxConcurrent = 3;
+        var sized = new BoundedExecutorService(
+                delegate, maxConcurrent, 60, TimeUnit.SECONDS, false, 
ThreadPoolRejectedPolicy.Block);
+        try {
+            AtomicInteger inFlight = new AtomicInteger();
+            AtomicInteger peak = new AtomicInteger();
+            int totalTasks = 20;
+            CountDownLatch holdTasks = new CountDownLatch(1);
+            CountDownLatch allDone = new CountDownLatch(totalTasks);
+
+            ExecutorService senders = Executors.newFixedThreadPool(totalTasks);
+            for (int i = 0; i < totalTasks; i++) {
+                senders.submit(() -> sized.execute(() -> {
+                    int current = inFlight.incrementAndGet();
+                    peak.accumulateAndGet(current, Math::max);
+                    try {
+                        holdTasks.await(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    inFlight.decrementAndGet();
+                    allDone.countDown();
+                }));
+            }
+
+            await().atMost(5, TimeUnit.SECONDS)
+                    .untilAsserted(() -> assertEquals(maxConcurrent, 
sized.getActiveCount()));
+
+            holdTasks.countDown();
+            assertTrue(allDone.await(30, TimeUnit.SECONDS), "All tasks should 
complete");
+            assertTrue(peak.get() <= maxConcurrent,
+                    "Peak concurrency (" + peak.get() + ") should be <= " + 
maxConcurrent);
+            senders.shutdown();
+        } finally {
+            sized.shutdown();
+            delegate.shutdown();
+        }
+    }
+
+    @Test
+    public void testPermitsReleasedAfterCompletion() throws Exception {
+        var delegate = Executors.newCachedThreadPool();
+        var sized = new BoundedExecutorService(
+                delegate, 2, 60, TimeUnit.SECONDS, false, 
ThreadPoolRejectedPolicy.Block);
+        try {
+            CountDownLatch firstBatch = new CountDownLatch(2);
+            for (int i = 0; i < 2; i++) {
+                sized.execute(firstBatch::countDown);
+            }
+            assertTrue(firstBatch.await(5, TimeUnit.SECONDS), "First batch 
should complete");
+
+            await().atMost(5, TimeUnit.SECONDS)
+                    .untilAsserted(() -> assertEquals(2, 
sized.getAvailablePermits()));
+
+            CountDownLatch secondBatch = new CountDownLatch(2);
+            for (int i = 0; i < 2; i++) {
+                sized.execute(secondBatch::countDown);
+            }
+            assertTrue(secondBatch.await(5, TimeUnit.SECONDS),
+                    "Second batch should succeed after permits are released");
+        } finally {
+            sized.shutdown();
+            delegate.shutdown();
+        }
+    }
+
+    @Test
+    public void testSubmitAfterShutdown() {
+        var delegate = Executors.newCachedThreadPool();
+        var sized = new BoundedExecutorService(
+                delegate, 5, 60, TimeUnit.SECONDS, false, 
ThreadPoolRejectedPolicy.CallerRuns);
+
+        sized.shutdown();
+        assertTrue(sized.isShutdown());
+        assertTrue(delegate.isShutdown());
+
+        assertThrows(RejectedExecutionException.class,
+                () -> sized.execute(() -> {
+                }),
+                "Should reject after shutdown");
+    }
+
+    @Test
+    public void testSubmitReturnsFuture() throws Exception {
+        var delegate = Executors.newCachedThreadPool();
+        var sized = new BoundedExecutorService(
+                delegate, 5, 60, TimeUnit.SECONDS, false, 
ThreadPoolRejectedPolicy.CallerRuns);
+        try {
+            AtomicBoolean executed = new AtomicBoolean();
+            Future<?> future = sized.submit(() -> executed.set(true));
+
+            future.get(5, TimeUnit.SECONDS);
+            assertTrue(executed.get(), "Task submitted via submit() should 
execute");
+            assertTrue(future.isDone());
+        } finally {
+            sized.shutdown();
+            delegate.shutdown();
+        }
+    }
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
index b2a8f57fddf4..da3bd2aae50e 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
@@ -28,6 +28,21 @@ your own code or tooling, add `org.jspecify:jspecify` 
explicitly to your project
 
 The `org.apache.camel.support.DefaultHeaderFilterStrategy` changed default 
setting for lowercase from `false` to `true`.
 
+==== Virtual threads: `maxQueueSize` now honored in `threads()` EIP
+
+When virtual threads are enabled (`camel.threads.virtual.enabled=true`), the 
`threads()` EIP now honors `maxQueueSize`
+for backpressure. Previously, `maxQueueSize` was silently ignored and tasks 
were accepted unboundedly.
+
+The virtual thread executor is wrapped with a semaphore-based concurrency 
limit (`BoundedExecutorService`) that enforces
+a flat cap of `maxQueueSize` on delegated tasks. The `keepAliveTime` parameter 
is repurposed as the
+semaphore acquisition timeout (pool sizing parameters are not applicable to 
virtual threads).
+
+==== New `Block` rejected policy
+
+A new `Block` value has been added to `ThreadPoolRejectedPolicy`. With 
`Block`, the caller blocks indefinitely until
+capacity becomes available. This applies to both platform and virtual threads. 
The existing `CallerRuns` (default) and
+`Abort` policies are unchanged.
+
 The type converters for Java serialized objects with types 
`java.io.ObjectInput` and `java.io.ObjectOutput` has been removed.
 Java object serialization is a recurring source of security issues and 
therefore these converters has been removed.
 These converters are not used at all by Camel itself. To restore compatibility 
then end users can add these type converters back as custom converters in their 
own Camel applications.
@@ -67,6 +82,26 @@ auto-disables `contentCache` on resource-based components 
(such as `xslt`) whose
 the route. Set `camel.component.<name>.contentCache=true` (or pass 
`?contentCache=true` on the
 URI) to opt back in to caching during dev mode.
 
+==== Unified `--packaging` option for `camel export`
+
+A new `--packaging` option has been added to `camel export` that works across 
all three runtimes
+(Camel Main, Spring Boot, and Quarkus). It replaces the Quarkus-specific 
`--quarkus-package-type`
+option, which is now deprecated.
+
+Accepted values:
+
+- `layered` or `fast-jar` — container-optimized packaging with separate 
dependency layers (default)
+- `fat-jar` or `uber-jar` — single executable JAR
+
+The default is `layered`, which produces Dockerfiles optimized for container 
image layer caching.
+Each runtime implements layered packaging using its native mechanism:
+
+- **Camel Main**: thin JAR with dependencies in a `lib/` folder
+- **Spring Boot**: multi-stage Dockerfile using Spring Boot's built-in layer 
extraction
+- **Quarkus**: fast-jar packaging (unchanged from the previous 
`--quarkus-package-type=fast-jar` default)
+
+The deprecated `--quarkus-package-type` option continues to work for backward 
compatibility.
+
 ==== Improved default `--quarkus-version`
 
 The default behavior of commands requiring Quarkus Platform version has 
improved.
diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc 
b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
index fb2afc1ed5b5..b419a3a1410a 100644
--- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc
+++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
@@ -28,8 +28,8 @@ The default profile is pre-configured out of the box with the 
following settings
 | *maxPoolSize* | `20` | Sets the default maximum pool size
 | *maxQueueSize* | `1000` | Sets the default maximum number of tasks in the 
work queue. Use -1 for an unbounded queue.
 | *allowCoreThreadTimeOut* | `true` | Sets default whether to allow core 
threads to timeout
-| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which 
cannot be executed by the thread pool. Has four options:
-`Abort, CallerRuns, Discard, DiscardOldest` which corresponds to the same four 
options provided out of the box in the JDK.
+| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which 
cannot be executed by the thread pool. Has three options:
+`Abort`, `CallerRuns`, `Block`. See <<rejected-policy>> for details.
 |===
 
 What that means is that for example when you use
@@ -310,15 +310,63 @@ To hook in custom thread pool providers a
 `ThreadPoolFactory` interface can be implemented. The implementation can
 be set in the `ExecutorServiceManager`.
 
+[[rejected-policy]]
+== Rejected Policy
+
+The `rejectedPolicy` option controls what happens when a thread pool cannot 
accept a new task
+(i.e., the pool and its work queue are full). The available policies are:
+
+[width="100%",cols="20%,80%",options="header"]
+|===
+| Policy | Description
+| *CallerRuns* | The task runs on the caller's thread. This provides natural 
backpressure — the caller is blocked
+doing useful work and cannot submit more tasks until it finishes. Tasks are 
never lost. This is the default.
+| *Abort* | The task is rejected with a `RejectedExecutionException`. Use this 
for HTTP APIs or latency-sensitive
+systems where failing fast is preferred over blocking.
+| *Block* | The caller blocks indefinitely until capacity becomes available. 
No timeout, no rejection. Use this for
+message broker consumers and batch workloads where losing a task is 
unacceptable and latency is less critical.
+|===
+
+With platform threads, these policies apply when the `ThreadPoolExecutor` work 
queue is full. With virtual threads,
+the same policies apply when the concurrency semaphore has no available 
permits (see <<virtual-threads>>).
+
+[[virtual-threads]]
 == Virtual Threads
 
 Starting from Java 21, the default `ThreadPoolFactory` can build 
`ExecutorService` and `ScheduledExecutorService` that
-use https://openjdk.org/jeps/425[virtual threads] instead of platform threads.
+use https://openjdk.org/jeps/444[virtual threads] instead of platform threads.
 
-But as it is an experimental feature, it is not enabled by default, you need 
to set the System property `camel.threads.virtual.enabled`
-to `true` and run Camel using Java 21 or above to enable it.
+To enable virtual threads, set the System property 
`camel.threads.virtual.enabled` to `true` and run Camel using
+Java 21 or above.
 
 Be aware that even if it is enabled, there are some use cases where platform 
threads are still used, for example, if the
 thread factory is configured to create non-daemon threads since virtual 
threads can only be daemons, or when the
 `ExecutorService` or `ScheduledExecutorService` to build cannot have more than 
one thread or finally when `corePoolSize`
 is set to zero and `maxQueueSize` is set to a value less or equal to `0`.
+
+=== Bounded Concurrency with Virtual Threads
+
+When `maxQueueSize` is set to a positive value, Camel wraps the virtual thread 
executor with a semaphore-based
+concurrency limit. This ensures that `maxQueueSize` is honored for 
backpressure, even though virtual threads do not
+use a traditional work queue.
+
+Unlike `ThreadPoolExecutor` where pool threads and queued tasks are distinct 
concepts, the virtual thread executor
+enforces a flat concurrency cap: the maximum number of concurrently executing 
tasks equals `maxQueueSize`.
+Pool sizing parameters (`poolSize`, `maxPoolSize`) are ignored since virtual 
threads are not pooled.
+All permitted tasks execute immediately on virtual threads — there is no queue 
of waiting tasks.
+
+The `rejectedPolicy` controls what happens when the concurrency limit is 
reached:
+
+* *CallerRuns* (default): the caller blocks up to `keepAliveTime` waiting for 
a permit. If the timeout expires,
+the task runs on the caller's thread.
+* *Abort*: the caller blocks up to `keepAliveTime` waiting for a permit. If 
the timeout expires,
+a `RejectedExecutionException` is thrown.
+* *Block*: the caller blocks indefinitely until a permit becomes available.
+
+While waiting for a permit, the calling thread is blocked. When callers are 
virtual threads this is inexpensive
+(the carrier thread is released). When callers are platform threads (e.g., 
HTTP server threads) the blocked thread
+is unavailable for other work.
+
+Pool sizing parameters (`poolSize`, `maxPoolSize`, `keepAliveTime`) do not 
control thread reuse with virtual threads
+since virtual threads are cheap to create and are never pooled. However, 
`keepAliveTime` is reused as the timeout
+for permit acquisition with `CallerRuns` and `Abort` policies.
diff --git a/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc 
b/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
index 06d8501a018e..0991ff14b7b4 100644
--- a/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
+++ b/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc
@@ -91,15 +91,24 @@ When virtual threads are enabled, Camel's 
`DefaultThreadPoolFactory` (JDK 21+ va
 | `Executors.newCachedThreadPool()`
 | `Executors.newThreadPerTaskExecutor()`
 
-| `newThreadPool()` (poolSize > 1)
-| `ThreadPoolExecutor`
-| `Executors.newThreadPerTaskExecutor()`
+| `newThreadPool()` (maxQueueSize > 0)
+| `ThreadPoolExecutor` with bounded queue
+| `Executors.newThreadPerTaskExecutor()` wrapped with semaphore-based 
concurrency limit
+
+| `newThreadPool()` (maxQueueSize ≤ 0)
+| `ThreadPoolExecutor` with `SynchronousQueue`
+| `Executors.newThreadPerTaskExecutor()` (unbounded)
 
 | `newScheduledThreadPool()`
 | `ScheduledThreadPoolExecutor`
 | `Executors.newScheduledThreadPool(0, factory)`
 |===
 
+When `maxQueueSize` is set to a positive value, the virtual thread executor is 
wrapped with a semaphore that enforces
+a flat concurrency cap of `maxQueueSize`. Unlike `ThreadPoolExecutor` where 
pool threads and queued tasks
+are distinct, all permitted tasks execute immediately on virtual threads. The 
`rejectedPolicy` controls what happens
+when the concurrency limit is reached — see 
xref:threading-model.adoc#rejected-policy[Rejected Policy] for details.
+
 [NOTE]
 ====
 Single-threaded executors and scheduled tasks still use platform threads, as 
virtual threads are optimized for concurrent I/O-bound work, not scheduled or 
sequential tasks.
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 4d3797581b51..45bf777d70f5 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -17646,7 +17646,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "maxQueueSize", type = "number", 
description = "Sets the maximum number of tasks in the work queue. Use -1 or 
Integer.MAX_VALUE for an unbounded queue", displayName = "Max Queue Size"),
                     @YamlProperty(name = "note", type = "string", description 
= "Sets the note of this node", displayName = "Note"),
                     @YamlProperty(name = "poolSize", type = "number", 
description = "Sets the core pool size", displayName = "Pool Size"),
-                    @YamlProperty(name = "rejectedPolicy", type = 
"enum:Abort,CallerRuns", description = "Sets the handler for tasks which cannot 
be executed by the thread pool.", displayName = "Rejected Policy"),
+                    @YamlProperty(name = "rejectedPolicy", type = 
"enum:Abort,CallerRuns,Block", description = "Sets the handler for tasks which 
cannot be executed by the thread pool.", displayName = "Rejected Policy"),
                     @YamlProperty(name = "timeUnit", type = 
"enum:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS", 
description = "Sets the time unit to use for keep alive time By default SECONDS 
is used.", displayName = "Time Unit")
             }
     )
@@ -17747,7 +17747,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "maxQueueSize", type = "number", 
description = "Sets the maximum number of tasks in the work queue. Use -1 or 
Integer.MAX_VALUE for an unbounded queue", displayName = "Max Queue Size"),
                     @YamlProperty(name = "note", type = "string", description 
= "Sets the note of this node", displayName = "Note"),
                     @YamlProperty(name = "poolSize", type = "number", 
description = "Sets the core pool size", displayName = "Pool Size"),
-                    @YamlProperty(name = "rejectedPolicy", type = 
"enum:Abort,CallerRuns", description = "Sets the handler for tasks which cannot 
be executed by the thread pool.", displayName = "Rejected Policy"),
+                    @YamlProperty(name = "rejectedPolicy", type = 
"enum:Abort,CallerRuns,Block", description = "Sets the handler for tasks which 
cannot be executed by the thread pool.", displayName = "Rejected Policy"),
                     @YamlProperty(name = "threadName", type = "string", 
defaultValue = "Threads", description = "Sets the thread name to use.", 
displayName = "Thread Name"),
                     @YamlProperty(name = "timeUnit", type = 
"enum:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS", 
description = "Sets the keep alive time unit. By default SECONDS is used.", 
displayName = "Time Unit")
             }
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
index eaaeefe3187f..8438e7a74b37 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json
@@ -4842,7 +4842,7 @@
             "type" : "string",
             "title" : "Rejected Policy",
             "description" : "Sets the handler for tasks which cannot be 
executed by the thread pool.",
-            "enum" : [ "Abort", "CallerRuns" ]
+            "enum" : [ "Abort", "CallerRuns", "Block" ]
           },
           "timeUnit" : {
             "type" : "string",
@@ -4920,7 +4920,7 @@
             "type" : "string",
             "title" : "Rejected Policy",
             "description" : "Sets the handler for tasks which cannot be 
executed by the thread pool.",
-            "enum" : [ "Abort", "CallerRuns" ]
+            "enum" : [ "Abort", "CallerRuns", "Block" ]
           },
           "threadName" : {
             "type" : "string",
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index 26fe66fb3b61..9baa494c457e 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -7528,7 +7528,7 @@
             "type" : "string",
             "title" : "Rejected Policy",
             "description" : "Sets the handler for tasks which cannot be 
executed by the thread pool.",
-            "enum" : [ "Abort", "CallerRuns" ]
+            "enum" : [ "Abort", "CallerRuns", "Block" ]
           },
           "timeUnit" : {
             "type" : "string",
@@ -7606,7 +7606,7 @@
             "type" : "string",
             "title" : "Rejected Policy",
             "description" : "Sets the handler for tasks which cannot be 
executed by the thread pool.",
-            "enum" : [ "Abort", "CallerRuns" ]
+            "enum" : [ "Abort", "CallerRuns", "Block" ]
           },
           "threadName" : {
             "type" : "string",

Reply via email to