This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push: new e4d0d6e50dd CAMEL-21671: camel-core: Split/Multicast EIP: Sync processing in parallel mode to avoid unbounded thread use, but to respect the thread pool limits from the EIP (#17331) e4d0d6e50dd is described below commit e4d0d6e50dd059b1c7434224008c211a9fffc4e4 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Mar 3 10:23:28 2025 +0000 CAMEL-21671: camel-core: Split/Multicast EIP: Sync processing in parallel mode to avoid unbounded thread use, but to respect the thread pool limits from the EIP (#17331) * CAMEL-21671: camel-core: Split/Multicast EIP: Sync processing in parallel mode to avoid unbounded thread use, but to respect the thread pool limits from the EIP. Thanks to Soheila Esmaeili for unit test. --- .../org/apache/camel/catalog/models/multicast.json | 2 +- .../apache/camel/catalog/models/recipientList.json | 2 +- .../org/apache/camel/catalog/models/split.json | 2 +- .../apache/camel/catalog/schemas/camel-spring.xsd | 17 ++- .../META-INF/org/apache/camel/model/multicast.json | 2 +- .../org/apache/camel/model/recipientList.json | 2 +- .../META-INF/org/apache/camel/model/split.json | 2 +- .../apache/camel/model/MulticastDefinition.java | 15 +++ .../camel/model/RecipientListDefinition.java | 15 +++ .../org/apache/camel/model/SplitDefinition.java | 15 +++ .../apache/camel/processor/MulticastProcessor.java | 19 +++- .../SplitterParallelAsyncProcessorIssueTest.java | 122 +++++++++++++++++++++ .../AsyncEndpointMulticastSynchronousTest.java | 67 +++++++++++ .../ROOT/pages/camel-4x-upgrade-guide-4_10.adoc | 13 ++- .../dsl/yaml/deserializers/ModelDeserializers.java | 6 +- .../generated/resources/schema/camelYamlDsl.json | 6 +- 16 files changed, 287 insertions(+), 20 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json index 0d22e833805..48f563f8eff 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json @@ -19,7 +19,7 @@ "aggregationStrategyMethodName": { "index": 4, "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." }, "aggregationStrategyMethodAllowNull": { "index": 5, "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the [...] "parallelAggregate": { "index": 6, "kind": "attribute", "displayName": "Parallel Aggregate", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thre [...] - "parallelProcessing": { "index": 7, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] + "parallelProcessing": { "index": 7, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] "synchronous": { "index": 8, "kind": "attribute", "displayName": "Synchronous", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the multicast is complete, even if parallel processing is enabled." }, "streaming": { "index": 9, "kind": "attribute", "displayName": "Streaming", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as defined by the multicast." }, "stopOnException": { "index": 10, "kind": "attribute", "displayName": "Stop On Exception", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processin [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json index 024240e23b3..a0248c8757b 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json @@ -21,7 +21,7 @@ "aggregationStrategyMethodName": { "index": 6, "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." }, "aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the [...] "parallelAggregate": { "index": 8, "kind": "attribute", "displayName": "Parallel Aggregate", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thre [...] - "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] + "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] "synchronous": { "index": 10, "kind": "attribute", "displayName": "Synchronous", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the recipient list is complete, even if parallel processing is enabled." }, "timeout": { "index": 11, "kind": "attribute", "displayName": "Timeout", "group": "common", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "description": "Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks o [...] "executorService": { "index": 12, "kind": "attribute", "displayName": "Executor Service", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json index 816bb7504f5..f62ad40e890 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json @@ -21,7 +21,7 @@ "aggregationStrategyMethodName": { "index": 6, "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." }, "aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the [...] "parallelAggregate": { "index": 8, "kind": "attribute", "displayName": "Parallel Aggregate", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thre [...] - "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub mess [...] + "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub mess [...] "synchronous": { "index": 10, "kind": "attribute", "displayName": "Synchronous", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the split is complete, even if parallel processing is enabled." }, "streaming": { "index": 11, "kind": "attribute", "displayName": "Streaming", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "When in streaming mode, then the splitter splits the original message on-demand, and each split message is processed one by one. This reduces memory usage as the splitter do not split all the messages first, but then we do n [...] "stopOnException": { "index": 12, "kind": "attribute", "displayName": "Stop On Exception", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processin [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd index 06008c197e7..438dd6ef9d1 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd @@ -10952,7 +10952,11 @@ If enabled then sending messages to the multicasts occurs concurrently. Note the messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thread that -called the multicast, then make sure to enable the synchronous option as well. Default value: false +called the multicast, then make sure to enable the synchronous option as well. In parallel processing mode, you may want +to also synchronous = true to force this EIP to process the sub-tasks using the upper bounds of the thread-pool. If +using synchronous = false then Camel will allow its reactive routing engine to use as many threads as possible, which +may be available due to sub-tasks using other thread-pools such as CompletableFuture.runAsync or others. Default value: +false ]]> </xs:documentation> </xs:annotation> @@ -11992,7 +11996,11 @@ If enabled then sending messages to the recipients occurs concurrently. Note the messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thread that -called the recipient list, then make sure to enable the synchronous option as well. Default value: false +called the recipient list, then make sure to enable the synchronous option as well. In parallel processing mode, you may +want to also synchronous = true to force this EIP to process the sub-tasks using the upper bounds of the thread-pool. If +using synchronous = false then Camel will allow its reactive routing engine to use as many threads as possible, which +may be available due to sub-tasks using other thread-pools such as CompletableFuture.runAsync or others. Default value: +false ]]> </xs:documentation> </xs:annotation> @@ -13619,7 +13627,10 @@ If enabled then processing each split messages occurs concurrently. Note the cal messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thread that called the -splitter, then make sure to enable the synchronous option as well. Default value: false +splitter, then make sure to enable the synchronous option as well. In parallel processing mode, you may want to also +synchronous = true to force this EIP to process the sub-tasks using the upper bounds of the thread-pool. If using +synchronous = false then Camel will allow its reactive routing engine to use as many threads as possible, which may be +available due to sub-tasks using other thread-pools such as CompletableFuture.runAsync or others. Default value: false ]]> </xs:documentation> </xs:annotation> diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json index 0d22e833805..48f563f8eff 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json @@ -19,7 +19,7 @@ "aggregationStrategyMethodName": { "index": 4, "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." }, "aggregationStrategyMethodAllowNull": { "index": 5, "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the [...] "parallelAggregate": { "index": 6, "kind": "attribute", "displayName": "Parallel Aggregate", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thre [...] - "parallelProcessing": { "index": 7, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] + "parallelProcessing": { "index": 7, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] "synchronous": { "index": 8, "kind": "attribute", "displayName": "Synchronous", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the multicast is complete, even if parallel processing is enabled." }, "streaming": { "index": 9, "kind": "attribute", "displayName": "Streaming", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as defined by the multicast." }, "stopOnException": { "index": 10, "kind": "attribute", "displayName": "Stop On Exception", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processin [...] diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json index 024240e23b3..a0248c8757b 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json @@ -21,7 +21,7 @@ "aggregationStrategyMethodName": { "index": 6, "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." }, "aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the [...] "parallelAggregate": { "index": 8, "kind": "attribute", "displayName": "Parallel Aggregate", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thre [...] - "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] + "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and proc [...] "synchronous": { "index": 10, "kind": "attribute", "displayName": "Synchronous", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the recipient list is complete, even if parallel processing is enabled." }, "timeout": { "index": 11, "kind": "attribute", "displayName": "Timeout", "group": "common", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "description": "Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks o [...] "executorService": { "index": 12, "kind": "attribute", "displayName": "Executor Service", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." }, diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json index 816bb7504f5..f62ad40e890 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json @@ -21,7 +21,7 @@ "aggregationStrategyMethodName": { "index": 6, "kind": "attribute", "displayName": "Aggregation Strategy Method Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." }, "aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute", "displayName": "Aggregation Strategy Method Allow Null", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the [...] "parallelAggregate": { "index": 8, "kind": "attribute", "displayName": "Parallel Aggregate", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thre [...] - "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub mess [...] + "parallelProcessing": { "index": 9, "kind": "attribute", "displayName": "Parallel Processing", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub mess [...] "synchronous": { "index": 10, "kind": "attribute", "displayName": "Synchronous", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used. When enabled then the same thread is used to continue routing after the split is complete, even if parallel processing is enabled." }, "streaming": { "index": 11, "kind": "attribute", "displayName": "Streaming", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "When in streaming mode, then the splitter splits the original message on-demand, and each split message is processed one by one. This reduces memory usage as the splitter do not split all the messages first, but then we do n [...] "stopOnException": { "index": 12, "kind": "attribute", "displayName": "Stop On Exception", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processin [...] diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java index 6f5c5276e2c..49d6ec063f8 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -208,6 +208,11 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> * from the parallel thread pool. However, if you want to use the original thread that called the multicast, then * make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public MulticastDefinition parallelProcessing() { @@ -224,6 +229,11 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> * from the parallel thread pool. However, if you want to use the original thread that called the multicast, then * make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public MulticastDefinition parallelProcessing(String parallelProcessing) { @@ -240,6 +250,11 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> * from the parallel thread pool. However, if you want to use the original thread that called the multicast, then * make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public MulticastDefinition parallelProcessing(boolean parallelProcessing) { diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java index cb394224180..e253e8490d4 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -232,6 +232,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext * from the parallel thread pool. However, if you want to use the original thread that called the recipient list, * then make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public RecipientListDefinition<Type> parallelProcessing() { @@ -248,6 +253,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext * from the parallel thread pool. However, if you want to use the original thread that called the recipient list, * then make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public RecipientListDefinition<Type> parallelProcessing(String parallelProcessing) { @@ -264,6 +274,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext * from the parallel thread pool. However, if you want to use the original thread that called the recipient list, * then make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) { diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java index 193a91ba96d..3934f60447b 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -229,6 +229,11 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer * from the parallel thread pool. However, if you want to use the original thread that called the splitter, then * make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public SplitDefinition parallelProcessing() { @@ -244,6 +249,11 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer * from the parallel thread pool. However, if you want to use the original thread that called the splitter, then * make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public SplitDefinition parallelProcessing(boolean parallelProcessing) { @@ -259,6 +269,11 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer * from the parallel thread pool. However, if you want to use the original thread that called the splitter, then * make sure to enable the synchronous option as well. * + * In parallel processing mode, you may want to also synchronous = true to force this EIP to process the sub-tasks + * using the upper bounds of the thread-pool. If using synchronous = false then Camel will allow its reactive + * routing engine to use as many threads as possible, which may be available due to sub-tasks using other + * thread-pools such as CompletableFuture.runAsync or others. + * * @return the builder */ public SplitDefinition parallelProcessing(String parallelProcessing) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 3fd2143fdd8..7ec13021502 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -604,8 +604,7 @@ public class MulticastProcessor extends AsyncProcessorSupport // compute time taken if sending to another endpoint StopWatch watch = beforeSend(pair); - AsyncProcessor async = AsyncProcessorConverterHelper.convert(pair.getProcessor()); - async.process(exchange, doneSync -> { + AsyncCallback taskCallback = (doneSync) -> { afterSend(pair, watch); // Decide whether to continue with the multicast or not; similar logic to the Pipeline @@ -640,7 +639,21 @@ public class MulticastProcessor extends AsyncProcessorSupport if (hasNext && !isParallelProcessing()) { schedule(this); } - }); + }; + + AsyncProcessor async = AsyncProcessorConverterHelper.convert(pair.getProcessor()); + if (synchronous) { + // force synchronous processing using await manager + // to restrict total number of threads to be bound by the thread-pool of this EIP, + // as otherwise in case of async processing then other thread pools can cause + // unbounded thread use that cannot be controlled by Camel + awaitManager.process(async, exchange); + taskCallback.done(true); + } else { + // async processing in reactive-mode which can use as many threads as possible + // if the downstream processors are async and use different threads + async.process(exchange, taskCallback); + } }); // after submitting this pair then move on to the next pair (if in parallel mode) if (hasNext && isParallelProcessing()) { diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelAsyncProcessorIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelAsyncProcessorIssueTest.java new file mode 100644 index 00000000000..fbc2331ddcd --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelAsyncProcessorIssueTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.*; + +import org.apache.camel.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SplitterParallelAsyncProcessorIssueTest extends ContextTestSupport { + + private final Set<String> threads = new HashSet<>(); + + @Test + public void testSplitParallelProcessingMaxThreads() throws Exception { + getMockEndpoint("mock:split").expectedMessageCount(10); + + String xmlBody = "<employees>" + + "<employee><id>1</id><name>John</name></employee>" + + "<employee><id>2</id><name>Jane</name></employee>" + + "<employee><id>3</id><name>Jim</name></employee>" + + "<employee><id>4</id><name>Jack</name></employee>" + + "<employee><id>5</id><name>Jill</name></employee>" + + "<employee><id>6</id><name>opi</name></employee>" + + "<employee><id>7</id><name>ds</name></employee>" + + "<employee><id>8</id><name>hhh</name></employee>" + + "<employee><id>9</id><name>fki</name></employee>" + + "<employee><id>10</id><name>abc</name></employee>" + + "</employees> "; + + template.sendBody("direct:start", xmlBody); + + assertMockEndpointsSatisfied(); + + log.info("{} Threads in use: {}", threads.size(), threads); + + // 2 from split EIP and 2 from the async processor that uses the JDK ForJoinPool + Assertions.assertTrue(threads.size() <= 4, "Should not use more than 4 threads, was: " + threads.size()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // only use 2 threads at-most + ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("threadPoolProfile"); + myThreadPoolProfile.setMaxPoolSize(2); + myThreadPoolProfile.setPoolSize(2); + myThreadPoolProfile.setMaxQueueSize(2); + myThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); + + getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile); + + AsyncProcessor asyncProcessor = new AsyncProcessor() { + @Override + public void process(Exchange exchange) throws Exception { + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // Run the processing in a separate thread + CompletableFuture.runAsync(() -> { + threads.add(Thread.currentThread().getName()); + try { + // Simulate some asynchronous processing + Thread.sleep(250); + exchange.getIn().setBody("Processed asynchronously"); + } catch (InterruptedException e) { + exchange.setException(e); + } finally { + // Signal completion + callback.done(false); + } + }); + + // Return false to indicate that processing is not complete yet + return false; + } + + @Override + public CompletableFuture<Exchange> processAsync(Exchange exchange) { + return null; + } + }; + + from("direct:start") + .split() + .xpath("/employees/employee") + .parallelProcessing().stopOnException().timeout(10000).executorService("threadPoolProfile").synchronous() + .process(e -> threads.add(Thread.currentThread().getName())) + .process(asyncProcessor) + .process(e -> threads.add(Thread.currentThread().getName())) + .delay(250) + .end() + .to("mock:split"); + } + }; + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointMulticastSynchronousTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointMulticastSynchronousTest.java new file mode 100644 index 00000000000..8f07e18aa1b --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointMulticastSynchronousTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AsyncEndpointMulticastSynchronousTest extends ContextTestSupport { + + private static String beforeThreadName; + private static String afterThreadName; + + @Test + public void testAsyncEndpoint() { + getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); + getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel"); + + String reply = template.requestBody("direct:start", "Hello Camel", String.class); + assertEquals("Bye Camel", reply); + + assertTrue(beforeThreadName.equalsIgnoreCase(afterThreadName), "Should use same threads"); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + context.addComponent("async", new MyAsyncComponent()); + + from("direct:start").to("mock:before").to("log:before").process(new Processor() { + public void process(Exchange exchange) { + beforeThreadName = Thread.currentThread().getName(); + } + }).multicast().synchronous().to("async:hi:moon", "direct:foo").end().process(new Processor() { + public void process(Exchange exchange) { + afterThreadName = Thread.currentThread().getName(); + } + }).to("log:after").to("mock:after").to("mock:result"); + + from("direct:foo").setBody(constant("Bye Camel")); + } + }; + } + +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc index d35d0e50dca..9127eb867c8 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc @@ -4,8 +4,17 @@ This document is for helping you upgrade your Apache Camel application from Camel 4.x to 4.y. For example, if you are upgrading Camel 4.0 to 4.2, then you should follow the guides from both 4.0 to 4.1 and 4.1 to 4.2. -<<<<<<< HEAD -======= +== Upgrading from 4.10.1 to 4.10.2 + +=== Recipient List, Split and Multicast EIP + +In parallel processing mode, you can also enable `synchronous=true` to force these EIPs to process +the sub-tasks using the upper bounds of the thread-pool. If using `synchronous=false` then Camel +will allow its reactive routing engine to use as many threads as possible, which may be available +due to sub-tasks using other thread-pools such as `CompletableFuture.runAsync` or others. + +Setting `synchronous=true` is the same behaviour is in Camel 2 which did not have the reactive routing engine. + == Upgrading from 4.10.0 to 4.10.1 === camel-api diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java index 5fbdc56df6f..ea018fb0655 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java @@ -10136,7 +10136,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport { @YamlProperty(name = "id", type = "string", description = "Sets the id of this node", displayName = "Id"), @YamlProperty(name = "onPrepare", type = "string", description = "Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send.", displayName = "On Prepare"), @YamlProperty(name = "parallelAggregate", type = "boolean", deprecated = true, description = "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the Aggregation [...] - @YamlProperty(name = "parallelProcessing", type = "boolean", description = "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the [...] + @YamlProperty(name = "parallelProcessing", type = "boolean", description = "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the [...] @YamlProperty(name = "shareUnitOfWork", type = "boolean", description = "Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Multicast will by default not share unit of work between the parent exchange and each multicasted exchange. This means each sub exchange has its own individual unit of work.", displayName = "Share Unit Of Work"), @YamlProperty(name = "steps", type = "array:org.apache.camel.model.ProcessorDefinition"), @YamlProperty(name = "stopOnException", type = "boolean", description = "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the multicast will stop further processing. This is the same [...] @@ -13030,7 +13030,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport { @YamlProperty(name = "ignoreInvalidEndpoints", type = "boolean", description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint", displayName = "Ignore Invalid Endpoints"), @YamlProperty(name = "onPrepare", type = "string", description = "Uses the Processor when preparing the org.apache.camel.Exchange to be used send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send.", displayName = "On Prepare"), @YamlProperty(name = "parallelAggregate", type = "boolean", deprecated = true, description = "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the Aggregation [...] - @YamlProperty(name = "parallelProcessing", type = "boolean", description = "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the [...] + @YamlProperty(name = "parallelProcessing", type = "boolean", description = "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the [...] @YamlProperty(name = "shareUnitOfWork", type = "boolean", description = "Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work.", displayName = "Share Unit Of Work"), @YamlProperty(name = "stopOnException", type = "boolean", description = "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the recipient list will stop further processing. This is the [...] @YamlProperty(name = "streaming", type = "boolean", description = "If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as defined by the recipient list.", displayName = "Streaming"), @@ -17422,7 +17422,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport { @YamlProperty(name = "id", type = "string", description = "Sets the id of this node", displayName = "Id"), @YamlProperty(name = "onPrepare", type = "string", description = "Uses the Processor when preparing the org.apache.camel.Exchange to be sent. This can be used to deep-clone messages that should be sent, or any custom logic needed before the exchange is sent.", displayName = "On Prepare"), @YamlProperty(name = "parallelAggregate", type = "boolean", deprecated = true, description = "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the Aggregation [...] - @YamlProperty(name = "parallelProcessing", type = "boolean", description = "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread [...] + @YamlProperty(name = "parallelProcessing", type = "boolean", description = "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread [...] @YamlProperty(name = "shareUnitOfWork", type = "boolean", description = "Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Splitter will by default not share unit of work between the parent exchange and each split exchange. This means each split exchange has its own individual unit of work.", displayName = "Share Unit Of Work"), @YamlProperty(name = "steps", type = "array:org.apache.camel.model.ProcessorDefinition"), @YamlProperty(name = "stopOnException", type = "boolean", description = "Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the splitter will stop further processing. This is the same b [...] diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json index 7deecd56aaf..28d783304bc 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json @@ -3383,7 +3383,7 @@ "parallelProcessing" : { "type" : "boolean", "title" : "Parallel Processing", - "description" : "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thre [...] + "description" : "If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thre [...] }, "shareUnitOfWork" : { "type" : "boolean", @@ -4484,7 +4484,7 @@ "parallelProcessing" : { "type" : "boolean", "title" : "Parallel Processing", - "description" : "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thre [...] + "description" : "If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thre [...] }, "shareUnitOfWork" : { "type" : "boolean", @@ -6794,7 +6794,7 @@ "parallelProcessing" : { "type" : "boolean", "title" : "Parallel Processing", - "description" : "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thread that called t [...] + "description" : "If enabled then processing each split messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. It's only processing the sub messages from the splitter which happens concurrently. When parallel processing is enabled, then the Camel routing engin will continue processing using last used thread from the parallel thread pool. However, if you want to use the original thread that called t [...] }, "shareUnitOfWork" : { "type" : "boolean",