This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch sf
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6052389c1a162ade71ad1a817d01830fb6303334
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Oct 11 11:20:04 2025 +0200

    CAMEL-22332: camel-salesforce - Add worker thread pool for consumers.
---
 .../camel/catalog/components/salesforce.json       | 25 +++---
 .../salesforce/SalesforceEndpointConfigurer.java   | 18 +++++
 .../salesforce/SalesforceEndpointUriFactory.java   |  5 +-
 .../camel/component/salesforce/salesforce.json     | 25 +++---
 .../src/main/docs/salesforce-component.adoc        |  9 +++
 .../component/salesforce/PubSubApiConsumer.java    | 28 ++++++-
 .../component/salesforce/SalesforceEndpoint.java   | 49 +++++++++++-
 .../component/salesforce/StreamingApiConsumer.java | 25 +++++-
 .../dsl/SalesforceEndpointBuilderFactory.java      | 92 ++++++++++++++++++++++
 9 files changed, 248 insertions(+), 28 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
index 14e8b7454d6e..2641b5448c8c 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
@@ -213,16 +213,19 @@
     "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
     "exceptionHandler": { "index": 56, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
     "exchangePattern": { "index": 57, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "enum", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
-    "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
-    "compositeMethod": { "index": 60, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
-    "eventName": { "index": 61, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
-    "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "enum", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descrip [...]
-    "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
-    "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
-    "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
-    "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
-    "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
-    "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "workerPoolEnabled": { "index": 58, "kind": "parameter", "displayName": 
"Worker Pool Enabled", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Use thread-pool for processing received 
Salesforce events, for example to process events in parallel." },
+    "workerPoolMaxSize": { "index": 59, "kind": "parameter", "displayName": 
"Worker Pool Max Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, 
"description": "Maximum thread pool-size size for consumer worker pool." },
+    "workerPoolSize": { "index": 60, "kind": "parameter", "displayName": 
"Worker Pool Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"description": "Minimum thread pool-size size for consumer worker pool." },
+    "allOrNone": { "index": 61, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
+    "apexUrl": { "index": 62, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
+    "compositeMethod": { "index": 63, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
+    "eventName": { "index": 64, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
+    "eventSchemaFormat": { "index": 65, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "enum", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descrip [...]
+    "eventSchemaId": { "index": 66, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
+    "rawHttpHeaders": { "index": 67, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
+    "rawMethod": { "index": 68, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
+    "rawPath": { "index": 69, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
+    "rawQueryParameters": { "index": 70, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
+    "lazyStartProducer": { "index": 71, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
   }
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
index 4edb088bfa6c..733626b35de7 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
@@ -154,6 +154,12 @@ public class SalesforceEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "streamQueryResult": 
target.getConfiguration().setStreamQueryResult(property(camelContext, 
java.lang.Boolean.class, value)); return true;
         case "updatetopic":
         case "updateTopic": 
target.getConfiguration().setUpdateTopic(property(camelContext, boolean.class, 
value)); return true;
+        case "workerpoolenabled":
+        case "workerPoolEnabled": 
target.setWorkerPoolEnabled(property(camelContext, boolean.class, value)); 
return true;
+        case "workerpoolmaxsize":
+        case "workerPoolMaxSize": 
target.setWorkerPoolMaxSize(property(camelContext, int.class, value)); return 
true;
+        case "workerpoolsize":
+        case "workerPoolSize": target.setWorkerPoolSize(property(camelContext, 
int.class, value)); return true;
         default: return false;
         }
     }
@@ -292,6 +298,12 @@ public class SalesforceEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "streamQueryResult": return java.lang.Boolean.class;
         case "updatetopic":
         case "updateTopic": return boolean.class;
+        case "workerpoolenabled":
+        case "workerPoolEnabled": return boolean.class;
+        case "workerpoolmaxsize":
+        case "workerPoolMaxSize": return int.class;
+        case "workerpoolsize":
+        case "workerPoolSize": return int.class;
         default: return null;
         }
     }
@@ -431,6 +443,12 @@ public class SalesforceEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "streamQueryResult": return 
target.getConfiguration().getStreamQueryResult();
         case "updatetopic":
         case "updateTopic": return target.getConfiguration().isUpdateTopic();
+        case "workerpoolenabled":
+        case "workerPoolEnabled": return target.isWorkerPoolEnabled();
+        case "workerpoolmaxsize":
+        case "workerPoolMaxSize": return target.getWorkerPoolMaxSize();
+        case "workerpoolsize":
+        case "workerPoolSize": return target.getWorkerPoolSize();
         default: return null;
         }
     }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
index 217a69701821..ab7046da2b4f 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class SalesforceEndpointUriFactory extends 
org.apache.camel.support.compo
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Map<String, String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(69);
+        Set<String> props = new HashSet<>(72);
         props.add("allOrNone");
         props.add("apexMethod");
         props.add("apexQueryParams");
@@ -93,6 +93,9 @@ public class SalesforceEndpointUriFactory extends 
org.apache.camel.support.compo
         props.add("streamQueryResult");
         props.add("topicName");
         props.add("updateTopic");
+        props.add("workerPoolEnabled");
+        props.add("workerPoolMaxSize");
+        props.add("workerPoolSize");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         SECRET_PROPERTY_NAMES = Collections.emptySet();
         MULTI_VALUE_PREFIXES = Collections.emptyMap();
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
 
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
index 14e8b7454d6e..2641b5448c8c 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
@@ -213,16 +213,19 @@
     "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
     "exceptionHandler": { "index": 56, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
     "exchangePattern": { "index": 57, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "enum", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
-    "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
-    "compositeMethod": { "index": 60, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
-    "eventName": { "index": 61, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
-    "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "enum", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descrip [...]
-    "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
-    "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
-    "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
-    "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
-    "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
-    "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "workerPoolEnabled": { "index": 58, "kind": "parameter", "displayName": 
"Worker Pool Enabled", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Use thread-pool for processing received 
Salesforce events, for example to process events in parallel." },
+    "workerPoolMaxSize": { "index": 59, "kind": "parameter", "displayName": 
"Worker Pool Max Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, 
"description": "Maximum thread pool-size size for consumer worker pool." },
+    "workerPoolSize": { "index": 60, "kind": "parameter", "displayName": 
"Worker Pool Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"description": "Minimum thread pool-size size for consumer worker pool." },
+    "allOrNone": { "index": 61, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
+    "apexUrl": { "index": 62, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
+    "compositeMethod": { "index": 63, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
+    "eventName": { "index": 64, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
+    "eventSchemaFormat": { "index": 65, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "enum", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descrip [...]
+    "eventSchemaId": { "index": 66, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
+    "rawHttpHeaders": { "index": 67, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
+    "rawMethod": { "index": 68, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
+    "rawPath": { "index": 69, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
+    "rawQueryParameters": { "index": 70, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
+    "lazyStartProducer": { "index": 71, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
   }
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
 
b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index be5b55bd8088..8a160dbd1658 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -1884,6 +1884,15 @@ 
from("salesforce:subscribe:CamelTestTopic&sObjectName=Merchandise__c")...
 
 |===
 
+===== Parallel processing received events
+
+You can turn on `workerPoolEnabled=true` on the salesforce endpoint to let 
Camel use a thread-pool to process
+the received events, which allows to process these events in parallel.
+
+NOTE: It has been reported that when receiving from PUSH_TOPIC events and then 
later sending a message
+to salesforce (via producer) via the same thread could cause Salesforce to 
block. Enabling the worker pool
+can help with this. See more in 
https://issues.apache.org/jira/browse/CAMEL-22332[CAMEL-22332].
+
 *Output*
 
 Type: Class passed via `sObjectName` parameter
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
index e1734bbee230..0dc99bf4f0d2 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.salesforce;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import com.salesforce.eventbus.protobuf.ReplayPreset;
 import org.apache.camel.AsyncCallback;
@@ -39,13 +40,13 @@ public class PubSubApiConsumer extends DefaultConsumer {
     private String initialReplayId;
     private boolean fallbackToLatestReplayId;
     private final SalesforceEndpoint endpoint;
+    private ExecutorService executorService;
 
     private final int batchSize;
     private final PubSubDeserializeType deserializeType;
     private Class<?> pojoClass;
     private PubSubApiClient pubSubClient;
     private Map<String, Class<?>> eventClassMap;
-
     private boolean usePlainTextConnection = false;
 
     public PubSubApiConsumer(SalesforceEndpoint endpoint, Processor processor) 
throws ClassNotFoundException {
@@ -73,8 +74,22 @@ public class PubSubApiConsumer extends DefaultConsumer {
         in.setHeader(HEADER_SALESFORCE_PUBSUB_EVENT_ID, eventId);
         in.setHeader(HEADER_SALESFORCE_PUBSUB_REPLAY_ID, replayId);
         in.setHeader(HEADER_SALESFORCE_PUBSUB_RPC_ID, rpcId);
+
+        // use default consumer callback
         AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        if (executorService != null) {
+            executorService.submit(() -> getAsyncProcessor().process(exchange, 
cb));
+        } else {
+            getAsyncProcessor().process(exchange, cb);
+        }
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
     }
 
     @Override
@@ -103,6 +118,15 @@ public class PubSubApiConsumer extends DefaultConsumer {
         super.doStop();
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        if (executorService != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+            executorService = null;
+        }
+    }
+
     public String getTopic() {
         return topic;
     }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index f7a47db573ab..fefb219c2fdd 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.salesforce;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -72,6 +74,16 @@ public class SalesforceEndpoint extends DefaultEndpoint {
     @UriParam(label = "consumer", description = "The replayId value to use 
when subscribing to the Pub/Sub API.")
     private String pubSubReplayId;
 
+    @UriParam(label = "consumer,advanced",
+              description = "Use thread-pool for processing received 
Salesforce events, for example to process events in parallel.")
+    private boolean workerPoolEnabled;
+    @UriParam(label = "consumer,advanced", description = "Minimum thread 
pool-size size for consumer worker pool.",
+              defaultValue = "1")
+    private int workerPoolSize = 1;
+    @UriParam(label = "consumer,advanced", description = "Maximum thread 
pool-size size for consumer worker pool.",
+              defaultValue = "10")
+    private int workerPoolMaxSize = 10;
+
     public SalesforceEndpoint(String uri, SalesforceComponent 
salesforceComponent, SalesforceEndpointConfig configuration,
                               OperationName operationName, String topicName) {
         super(uri, salesforceComponent);
@@ -94,14 +106,23 @@ public class SalesforceEndpoint extends DefaultEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
+        ExecutorService executorService = null;
+        if (workerPoolEnabled) {
+            executorService = 
getCamelContext().getExecutorServiceManager().newThreadPool(this, 
"SalesforceWorkerPool",
+                    workerPoolSize, workerPoolMaxSize);
+        }
         Consumer consumer = null;
         switch (operationName) {
             case SUBSCRIBE -> {
                 final SubscriptionHelper subscriptionHelper = 
getComponent().getSubscriptionHelper();
-                consumer = new StreamingApiConsumer(this, processor, 
subscriptionHelper);
+                StreamingApiConsumer answer = new StreamingApiConsumer(this, 
processor, subscriptionHelper);
+                answer.setExecutorService(executorService);
+                consumer = answer;
             }
             case PUBSUB_SUBSCRIBE -> {
-                consumer = new PubSubApiConsumer(this, processor);
+                PubSubApiConsumer answer = new PubSubApiConsumer(this, 
processor);
+                answer.setExecutorService(executorService);
+                consumer = answer;
             }
             default -> {
                 // NO OP
@@ -144,6 +165,30 @@ public class SalesforceEndpoint extends DefaultEndpoint {
         this.pubSubReplayId = pubSubReplayId;
     }
 
+    public boolean isWorkerPoolEnabled() {
+        return workerPoolEnabled;
+    }
+
+    public void setWorkerPoolEnabled(boolean workerPoolEnabled) {
+        this.workerPoolEnabled = workerPoolEnabled;
+    }
+
+    public int getWorkerPoolSize() {
+        return workerPoolSize;
+    }
+
+    public void setWorkerPoolSize(int workerPoolSize) {
+        this.workerPoolSize = workerPoolSize;
+    }
+
+    public int getWorkerPoolMaxSize() {
+        return workerPoolMaxSize;
+    }
+
+    public void setWorkerPoolMaxSize(int workerPoolMaxSize) {
+        this.workerPoolMaxSize = workerPoolMaxSize;
+    }
+
     @Override
     protected void doStart() throws Exception {
         try {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
index 4dcfd100a6e5..95a8a5e730dd 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.salesforce;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.AsyncCallback;
@@ -74,6 +75,7 @@ public class StreamingApiConsumer extends DefaultConsumer {
     private final SalesforceEndpoint endpoint;
     private final MessageKind messageKind;
     private final ObjectMapper objectMapper;
+    private ExecutorService executorService;
 
     private final boolean rawPayload;
     private Class<?> sObjectClass;
@@ -104,6 +106,14 @@ public class StreamingApiConsumer extends DefaultConsumer {
         rawPayload = endpoint.getConfiguration().isRawPayload();
     }
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
     public String getTopicName() {
         return topicName;
     }
@@ -141,7 +151,11 @@ public class StreamingApiConsumer extends DefaultConsumer {
 
         // use default consumer callback
         AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        if (executorService != null) {
+            executorService.submit(() -> getAsyncProcessor().process(exchange, 
cb));
+        } else {
+            getAsyncProcessor().process(exchange, cb);
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -333,6 +347,15 @@ public class StreamingApiConsumer extends DefaultConsumer {
         }
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        if (executorService != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+            executorService = null;
+        }
+    }
+
     // May be necessary to call from some unit tests.
     void determineSObjectClass() {
         // get sObjectClass to convert to
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
index 4971cfc96e79..d8b7b02f923b 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
@@ -1497,6 +1497,98 @@ public interface SalesforceEndpointBuilderFactory {
             doSetProperty("exchangePattern", exchangePattern);
             return this;
         }
+        /**
+         * Use thread-pool for processing received Salesforce events, for
+         * example to process events in parallel.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolEnabled the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSalesforceEndpointConsumerBuilder 
workerPoolEnabled(boolean workerPoolEnabled) {
+            doSetProperty("workerPoolEnabled", workerPoolEnabled);
+            return this;
+        }
+        /**
+         * Use thread-pool for processing received Salesforce events, for
+         * example to process events in parallel.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolEnabled the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSalesforceEndpointConsumerBuilder 
workerPoolEnabled(String workerPoolEnabled) {
+            doSetProperty("workerPoolEnabled", workerPoolEnabled);
+            return this;
+        }
+        /**
+         * Maximum thread pool-size size for consumer worker pool.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolMaxSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSalesforceEndpointConsumerBuilder 
workerPoolMaxSize(int workerPoolMaxSize) {
+            doSetProperty("workerPoolMaxSize", workerPoolMaxSize);
+            return this;
+        }
+        /**
+         * Maximum thread pool-size size for consumer worker pool.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolMaxSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSalesforceEndpointConsumerBuilder 
workerPoolMaxSize(String workerPoolMaxSize) {
+            doSetProperty("workerPoolMaxSize", workerPoolMaxSize);
+            return this;
+        }
+        /**
+         * Minimum thread pool-size size for consumer worker pool.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 1
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSalesforceEndpointConsumerBuilder workerPoolSize(int 
workerPoolSize) {
+            doSetProperty("workerPoolSize", workerPoolSize);
+            return this;
+        }
+        /**
+         * Minimum thread pool-size size for consumer worker pool.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 1
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSalesforceEndpointConsumerBuilder 
workerPoolSize(String workerPoolSize) {
+            doSetProperty("workerPoolSize", workerPoolSize);
+            return this;
+        }
     }
 
     /**


Reply via email to