This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 9674ba1 CAMEL-16861: Cleanup and update EIP docs 9674ba1 is described below commit 9674ba16405873d57791c5202f188c4d28577f61 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Oct 21 09:18:29 2021 +0200 CAMEL-16861: Cleanup and update EIP docs --- .../pages/enterprise-integration-patterns.adoc | 4 + .../main/docs/modules/eips/pages/threads-eip.adoc | 111 +++++++++++++++++---- .../camel/processor/ThreadsMaxQueueSizeTest.java | 3 +- ...eTest.java => ThreadsZeroMaxQueueSizeTest.java} | 11 +- 4 files changed, 99 insertions(+), 30 deletions(-) diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/enterprise-integration-patterns.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/enterprise-integration-patterns.adoc index 4143e71..f2a3185 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/enterprise-integration-patterns.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/enterprise-integration-patterns.adoc @@ -166,6 +166,10 @@ destination of a message from the sender and maintain central control over the flow of messages? a|image::eip/MessagingAdapterIcon.gif[image] +|xref:threads-eip.adoc[Threads] |How can I decouple the continued routing +of a message from the current thread? + +a|image::eip/MessagingAdapterIcon.gif[image] |xref:throttle-eip.adoc[Throttler] |How can I throttle messages to ensure that a specific endpoint does not get overloaded, or we don't exceed an agreed SLA with some external service? diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/threads-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/threads-eip.adoc index ce80bd1..a5df0bf 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/threads-eip.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/threads-eip.adoc @@ -5,41 +5,48 @@ :since: :supportlevel: Stable +How can I decouple the continued routing of a message from the current thread? + +image::eip/MessagingAdapterIcon.gif[image] + +Submit the message to a thread pool, which then is responsible for the continued routing of the message. + +In Camel this is implemented as the Threads EIP. + == Options // eip options: START include::partial$eip-options.adoc[] // eip options: END -== About rejected tasks +== Using Threads EIP -The Threads EIP uses a thread pool which has a worker queue for tasks. -When the worker queue gets full, the task is rejected. You can customize -how to react upon this using the `rejectedPolicy` and -`callerRunsWhenRejected` option. The latter is used to easily switch -between the two most common and recommended settings. Either let the -current caller thread execute the task (i.e. it will become synchronous), -but also give time for the thread pool to process its current tasks, -without adding more tasks (self throttling). This is the default -behavior. If setting `callerRunsWhenRejected` you use the `Abort` -policy, which means the task is rejected, and a -`RejectedExecutionException` is set on the xref:latest@manual:ROOT:exchange.adoc[Exchange], -and the `Exchange` will stop continue being routed, and its `UnitOfWork` will be regarded as failed. +The example below will add a Thread pool with a pool size of 5 threads before sending to mock:result. + +[source,java] +---- +from("seda:a") + .threads(5) + .to("mock:result"); +---- -The other options `Discard` and `DiscardOldest` work a bit like -`Abort`, however they do *not* set any exception on the -Exchange, which means the `Exchange` will *not* be regarded as failed, but the -`Exchange` will be successful. When using `Discard` and `DiscardOldest` then the `Exchange` will not -continue being routed. +And in XML DSL -== Example +[source,xml] +---- +<route> + <from uri="seda:a"/> + <threads poolSize="5"/> + <to uri="mock:result"/> +</route> +---- -The example below will add a Thread pool with a pool size of 5 threads before sending to *mock:result*. +And to use a thread pool with a task queue of only 20 elements: [source,java] ---- from("seda:a") - .threads(5) + .threads(5).maxQueueSize(20) .to("mock:result"); ---- @@ -49,7 +56,67 @@ And in XML DSL ---- <route> <from uri="seda:a"/> - <threads poolSize="5"/> + <threads poolSize="5" maxQueueSize="20"/> <to uri="mock:result"/> </route> ---- + +And you can also use a thread pool with no queue (meaning that a task cannot be pending on a queue): + +[source,java] +---- +from("seda:a") + .threads(5).maxQueueSize(0) + .to("mock:result"); +---- + +And in XML DSL + +[source,xml] +---- +<route> + <from uri="seda:a"/> + <threads poolSize="5" maxQueueSize="0"/> + <to uri="mock:result"/> +</route> +---- + +=== About rejected tasks + +The Threads EIP uses a thread pool which has a worker queue for tasks. +When the worker queue gets full, the task is rejected. + +You can customize how to react upon this using the `rejectedPolicy` and +`callerRunsWhenRejected` options. The latter is used to easily switch +between the two most common and recommended settings. Either let the +current caller thread execute the task (i.e. it will become synchronous), +but also give time for the thread pool to process its current tasks, +without adding more tasks (self throttling). This is the default +behavior. + +The `Abort` policy, means the task is rejected, and a +`RejectedExecutionException` is thrown. + +The other options `Discard` and `DiscardOldest` +is *deprecated* and should not be used. + +=== Default values + +The Threads EIP uses the default values from the default xref:latest@manual:ROOT:threading-model.adoc[Thread Pool Profile]. +If the profile has not been altered, then the default profile are as follows: + +[width="100%",cols="25%,25%,50%",options="header",] +|=== +| Option | Default | Description +| *poolSize* | `10` | Sets the default core pool size (threads to keep minimum in pool) +| *keepAliveTime* | `60` | Sets the default keep alive time (in seconds) for inactive threads +| *maxPoolSize* | `20` | Sets the default maximum pool size +| *maxQueueSize* | `1000` | Sets the default maximum number of tasks in the work queue. Use -1 for an unbounded queue. +| *allowCoreThreadTimeOut* | `true` | Sets default whether to allow core threads to timeout +| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which cannot be executed by the thread pool. Has four options: +`Abort, CallerRuns, Discard, DiscardOldest` which corresponds to the same four options provided out of the box in the JDK. +|=== + +=== See Also + +See xref:latest@manual:ROOT:threading-model.adoc[Threading Model] diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java index b4d65ce..c0455bd 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java @@ -46,8 +46,7 @@ public class ThreadsMaxQueueSizeTest extends ContextTestSupport { @Override public void configure() throws Exception { from("direct:start") - // will use a a custom thread pool with 5 in core and 10 as - // max + // will use a custom thread pool with 5 in core and 10 as max // and a max task queue with 2000 .threads(5, 10).maxQueueSize(2000).to("mock:result"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsZeroMaxQueueSizeTest.java similarity index 83% copy from core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java copy to core/camel-core/src/test/java/org/apache/camel/processor/ThreadsZeroMaxQueueSizeTest.java index b4d65ce..1262dd1 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsZeroMaxQueueSizeTest.java @@ -20,7 +20,7 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.junit.jupiter.api.Test; -public class ThreadsMaxQueueSizeTest extends ContextTestSupport { +public class ThreadsZeroMaxQueueSizeTest extends ContextTestSupport { @Test public void testThreadsMaxQueueSize() throws Exception { @@ -46,14 +46,13 @@ public class ThreadsMaxQueueSizeTest extends ContextTestSupport { @Override public void configure() throws Exception { from("direct:start") - // will use a a custom thread pool with 5 in core and 10 as - // max - // and a max task queue with 2000 - .threads(5, 10).maxQueueSize(2000).to("mock:result"); + // will use a custom thread pool with 5 in core and 10 as max + // and no task queue + .threads(5, 10).maxQueueSize(0).to("mock:result"); from("direct:foo") // using the builder style - .threads().poolSize(5).maxPoolSize(10).maxQueueSize(2000).threadName("myPool").to("mock:result"); + .threads().poolSize(5).maxPoolSize(10).maxQueueSize(0).threadName("myPool").to("mock:result"); } }; }