This is an automated email from the ASF dual-hosted git repository. jeremyross pushed a commit to branch camel-3.11.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.11.x by this push: new 9d7177d CAMEL-16877: camel-salesforce: Fix timeouts... 9d7177d is described below commit 9d7177dc65b3db0fa733ce181fd04ab3ec5c3443 Author: Jeremy Ross <jeremy.g.r...@gmail.com> AuthorDate: Wed Sep 15 11:05:55 2021 -0500 CAMEL-16877: camel-salesforce: Fix timeouts... occurring in synchronous/transacted routes. --- .../camel/catalog/docs/salesforce-component.adoc | 4 ++- .../salesforce/SalesforceComponentConfigurer.java | 12 +++++++ .../services/org/apache/camel/component.properties | 2 +- .../camel/component/salesforce/salesforce.json | 4 ++- .../src/main/docs/salesforce-component.adoc | 4 ++- .../component/salesforce/SalesforceComponent.java | 37 +++++++++++++++++++--- .../component/salesforce/SalesforceHttpClient.java | 34 ++++++++++++++++---- .../internal/client/AbstractClientBase.java | 20 +++++++----- .../salesforce/RestApiIntegrationTest.java | 16 ++++++++++ .../internal/client/AbstractClientBaseTest.java | 4 +++ .../modules/ROOT/pages/salesforce-component.adoc | 4 ++- 11 files changed, 117 insertions(+), 24 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/salesforce-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/salesforce-component.adoc index 4418871..e6f2856 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/salesforce-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/salesforce-component.adoc @@ -824,7 +824,7 @@ for details on how to generate the DTO. == Options // component options: START -The Salesforce component supports 84 options, which are listed below. +The Salesforce component supports 86 options, which are listed below. @@ -881,6 +881,8 @@ The Salesforce component supports 84 options, which are listed below. | *config* (common) | Global endpoint configuration - use to set values that are common to all endpoints | | SalesforceEndpointConfig | *httpClientProperties* (common) | Used to set any properties that can be configured on the underlying HTTP client. Have a look at properties of SalesforceHttpClient and the Jetty HttpClient for all available options. | | Map | *longPollingTransportProperties* (common) | Used to set any properties that can be configured on the LongPollingTransport used by the BayeuxClient (CometD) used by the streaming api | | Map +| *workerPoolMaxSize* (common) | Maximum size of the thread pool used to handle HTTP responses. | 20 | int +| *workerPoolSize* (common) | Size of the thread pool used to handle HTTP responses. | 10 | int | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *allOrNone* (producer) | Composite API option to indicate to rollback all records if any are not successful. | false | boolean | *apexUrl* (producer) | APEX method URL | | String diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java index 281478f..84be268 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java @@ -190,6 +190,10 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp case "useGlobalSslContextParameters": target.setUseGlobalSslContextParameters(property(camelContext, boolean.class, value)); return true; case "username": case "userName": target.setUserName(property(camelContext, java.lang.String.class, value)); return true; + case "workerpoolmaxsize": + case "workerPoolMaxSize": target.setWorkerPoolMaxSize(property(camelContext, int.class, value)); return true; + case "workerpoolsize": + case "workerPoolSize": target.setWorkerPoolSize(property(camelContext, int.class, value)); return true; default: return false; } } @@ -359,6 +363,10 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp case "useGlobalSslContextParameters": return boolean.class; case "username": case "userName": return java.lang.String.class; + case "workerpoolmaxsize": + case "workerPoolMaxSize": return int.class; + case "workerpoolsize": + case "workerPoolSize": return int.class; default: return null; } } @@ -529,6 +537,10 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp case "useGlobalSslContextParameters": return target.isUseGlobalSslContextParameters(); case "username": case "userName": return target.getUserName(); + case "workerpoolmaxsize": + case "workerPoolMaxSize": return target.getWorkerPoolMaxSize(); + case "workerpoolsize": + case "workerPoolSize": return target.getWorkerPoolSize(); default: return null; } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/services/org/apache/camel/component.properties b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/services/org/apache/camel/component.properties index a8ec3e5..c7fd0c3 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/services/org/apache/camel/component.properties +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/services/org/apache/camel/component.properties @@ -2,6 +2,6 @@ components=salesforce groupId=org.apache.camel artifactId=camel-salesforce -version=3.11.2-SNAPSHOT +version=3.11.3-SNAPSHOT projectName=Camel :: Salesforce projectDescription=Camel Salesforce support diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json index 92a5374..05a9f09 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json @@ -11,7 +11,7 @@ "supportLevel": "Stable", "groupId": "org.apache.camel", "artifactId": "camel-salesforce", - "version": "3.11.2-SNAPSHOT", + "version": "3.11.3-SNAPSHOT", "scheme": "salesforce", "extendsScheme": "", "syntax": "salesforce:operationName:topicName", @@ -73,6 +73,8 @@ "config": { "kind": "property", "displayName": "Config", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "deprecated": false, "autowired": false, "secret": false, "description": "Global endpoint configuration - use to set values that are common to all endpoints" }, "httpClientProperties": { "kind": "property", "displayName": "Http Client Properties", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "description": "Used to set any properties that can be configured on the underlying HTTP client. Have a look at properties of SalesforceHttpClient and the Jetty HttpClient for all ava [...] "longPollingTransportProperties": { "kind": "property", "displayName": "Long Polling Transport Properties", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "description": "Used to set any properties that can be configured on the LongPollingTransport used by the BayeuxClient (CometD) used by the streaming api" }, + "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20, "description": "Maximum size of the thread pool used to handle HTTP responses." }, + "workerPoolSize": { "kind": "property", "displayName": "Worker Pool Size", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Size of the thread pool used to handle HTTP responses." }, "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "allOrNone": { "kind": "property", "displayName": "All Or None", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Composite API option to indicate to rollback all records if any are not successful." }, "apexUrl": { "kind": "property", "displayName": "Apex Url", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "APEX method URL" }, diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc index 4418871..e6f2856 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc +++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc @@ -824,7 +824,7 @@ for details on how to generate the DTO. == Options // component options: START -The Salesforce component supports 84 options, which are listed below. +The Salesforce component supports 86 options, which are listed below. @@ -881,6 +881,8 @@ The Salesforce component supports 84 options, which are listed below. | *config* (common) | Global endpoint configuration - use to set values that are common to all endpoints | | SalesforceEndpointConfig | *httpClientProperties* (common) | Used to set any properties that can be configured on the underlying HTTP client. Have a look at properties of SalesforceHttpClient and the Jetty HttpClient for all available options. | | Map | *longPollingTransportProperties* (common) | Used to set any properties that can be configured on the LongPollingTransport used by the BayeuxClient (CometD) used by the streaming api | | Map +| *workerPoolMaxSize* (common) | Maximum size of the thread pool used to handle HTTP responses. | 20 | int +| *workerPoolSize* (common) | Size of the thread pool used to handle HTTP responses. | 10 | int | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *allOrNone* (producer) | Composite API option to indicate to rollback all records if any are not successful. | false | boolean | *apexUrl* (producer) | APEX method URL | | String diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index e7b082c..c42df68 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -183,6 +183,13 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP label = "common,advanced") private Map<String, Object> httpClientProperties; + @Metadata(description = "Size of the thread pool used to handle HTTP responses.", + label = "common,advanced", defaultValue = "10") + private int workerPoolSize = 10; + @Metadata(description = "Maximum size of the thread pool used to handle HTTP responses.", + label = "common,advanced", defaultValue = "20") + private int workerPoolMaxSize = 20; + @Metadata(description = "Used to set any properties that can be configured on the LongPollingTransport used by the" + " BayeuxClient (CometD) used by the streaming api", label = "common,advanced") @@ -383,7 +390,7 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP final SslContextFactory sslContextFactory = new SslContextFactory(); sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext())); - httpClient = createHttpClient(sslContextFactory); + httpClient = createHttpClient(this, sslContextFactory, getCamelContext(), workerPoolSize, workerPoolMaxSize); if (config != null) { config.setHttpClient(httpClient); } @@ -716,6 +723,22 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP this.httpProxyUseDigestAuth = httpProxyUseDigestAuth; } + public int getWorkerPoolSize() { + return workerPoolSize; + } + + public void setWorkerPoolSize(int workerPoolSize) { + this.workerPoolSize = workerPoolSize; + } + + public int getWorkerPoolMaxSize() { + return workerPoolMaxSize; + } + + public void setWorkerPoolMaxSize(int workerPoolMaxSize) { + this.workerPoolMaxSize = workerPoolMaxSize; + } + public String getPackages() { return packages; } @@ -781,7 +804,8 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP final SslContextFactory sslContextFactory = new SslContextFactory(); sslContextFactory.setSslContext(sslContextParameters.createSSLContext(camelContext)); - final SalesforceHttpClient httpClient = createHttpClient(sslContextFactory); + final SalesforceHttpClient httpClient + = createHttpClient("SalesforceComponent", sslContextFactory, camelContext, 10, 20); setupHttpClient(httpClient, camelContext, properties); final SalesforceSession session = new SalesforceSession(camelContext, httpClient, httpClient.getTimeout(), loginConfig); @@ -794,10 +818,15 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP return new DefaultRawClient(httpClient, "", session, loginConfig); } - static SalesforceHttpClient createHttpClient(final SslContextFactory sslContextFactory) throws Exception { + static SalesforceHttpClient createHttpClient( + Object source, final SslContextFactory sslContextFactory, final CamelContext context, int workerPoolSize, + int workerPoolMaxSize) { SecurityUtils.adaptToIBMCipherNames(sslContextFactory); - final SalesforceHttpClient httpClient = new SalesforceHttpClient(sslContextFactory); + final SalesforceHttpClient httpClient = new SalesforceHttpClient( + context, context.getExecutorServiceManager().newThreadPool(source, "SalesforceHttpClient", workerPoolSize, + workerPoolMaxSize), + sslContextFactory); // default settings, use httpClientProperties to set other // properties httpClient.setConnectTimeout(CONNECTION_TIMEOUT); diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java index 0fce4bb..e12403c 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java @@ -19,13 +19,15 @@ package org.apache.camel.component.salesforce; import java.lang.reflect.Method; import java.net.URI; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelContext; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.component.salesforce.internal.client.SalesforceHttpRequest; import org.apache.camel.component.salesforce.internal.client.SalesforceSecurityHandler; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.client.HttpConversation; import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.ProtocolHandler; @@ -33,8 +35,6 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.util.ssl.SslContextFactory; -import static java.util.Optional.ofNullable; - /** * Custom Salesforce HTTP Client that creates {@link SalesforceHttpRequest} requests. */ @@ -45,6 +45,7 @@ public class SalesforceHttpClient extends HttpClient { private static final int DEFAULT_MAX_RETRIES = 3; private static final int DEFAULT_MAX_CONTENT_LENGTH = 4 * 1024 * 1024; + private final CamelContext camelContext; private SalesforceSession session; private int maxRetries = DEFAULT_MAX_RETRIES; @@ -52,19 +53,22 @@ public class SalesforceHttpClient extends HttpClient { private long timeout = DEFAULT_TIMEOUT; private final Method addProtocolHandlerMethod; - private final Method getProtocolHandlersMethod; + private final ExecutorService workerPool; + public SalesforceHttpClient() { this(null); } public SalesforceHttpClient(SslContextFactory sslContextFactory) { - this(null, sslContextFactory); + this(null, Executors.newCachedThreadPool(), sslContextFactory); } - public SalesforceHttpClient(HttpClientTransport transport, SslContextFactory sslContextFactory) { - super(ofNullable(transport).orElse(new HttpClientTransportOverHTTP()), sslContextFactory); + public SalesforceHttpClient(CamelContext context, ExecutorService workerPool, SslContextFactory sslContextFactory) { + super(new HttpClientTransportOverHTTP(), sslContextFactory); + this.workerPool = workerPool; + this.camelContext = context; // Jetty 9.3, as opposed to 9.2 the way to add ProtocolHandler to // HttpClient changed in 9.2 HttpClient::getProtocolHandlers returned @@ -111,6 +115,18 @@ public class SalesforceHttpClient extends HttpClient { super.doStart(); } + @Override + protected void doStop() throws Exception { + if (workerPool != null) { + if (camelContext != null) { + camelContext.getExecutorServiceManager().shutdownGraceful(workerPool); + } else { + workerPool.shutdown(); + } + } + super.doStop(); + } + public SalesforceSession getSession() { return session; } @@ -142,4 +158,8 @@ public class SalesforceHttpClient extends HttpClient { public void setTimeout(long timeout) { this.timeout = timeout; } + + public ExecutorService getWorkerPool() { + return workerPool; + } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java index da043d8..c2c535c 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java @@ -202,12 +202,14 @@ public abstract class AbstractClientBase extends ServiceSupport // from SalesforceSecurityHandler Throwable failure = result.getFailure(); if (failure instanceof SalesforceException) { - callback.onResponse(null, headers, (SalesforceException) failure); + httpClient.getWorkerPool() + .execute(() -> callback.onResponse(null, headers, (SalesforceException) failure)); } else { final String msg = String.format("Unexpected error {%s:%s} executing {%s:%s}", response.getStatus(), response.getReason(), request.getMethod(), request.getURI()); - callback.onResponse(null, headers, new SalesforceException(msg, response.getStatus(), failure)); + httpClient.getWorkerPool().execute(() -> callback.onResponse(null, headers, + new SalesforceException(msg, response.getStatus(), failure))); } } else { @@ -226,14 +228,14 @@ public abstract class AbstractClientBase extends ServiceSupport session.parseLoginResponse(contentResponse, getContentAsString()); final String msg = String.format("Unexpected Error {%s:%s} executing {%s:%s}", status, response.getReason(), request.getMethod(), request.getURI()); - callback.onResponse(null, headers, new SalesforceException(msg, null)); - + httpClient.getWorkerPool() + .execute(() -> callback.onResponse(null, headers, new SalesforceException(msg, null))); } catch (SalesforceException e) { final String msg = String.format("Error {%s:%s} executing {%s:%s}", status, response.getReason(), request.getMethod(), request.getURI()); - callback.onResponse(null, headers, new SalesforceException(msg, response.getStatus(), e)); - + httpClient.getWorkerPool().execute(() -> callback.onResponse(null, headers, + new SalesforceException(msg, response.getStatus(), e))); } } else if (status < HttpStatus.OK_200 || status >= HttpStatus.MULTIPLE_CHOICES_300) { // Salesforce HTTP failure! @@ -241,11 +243,13 @@ public abstract class AbstractClientBase extends ServiceSupport // for APIs that return body on status 400, such as // Composite API we need content as well - callback.onResponse(getContentAsInputStream(), headers, exception); + httpClient.getWorkerPool() + .execute(() -> callback.onResponse(getContentAsInputStream(), headers, exception)); } else { // Success!!! - callback.onResponse(getContentAsInputStream(), headers, null); + httpClient.getWorkerPool() + .execute(() -> callback.onResponse(getContentAsInputStream(), headers, null)); } } } finally { diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java index 68dd5b9..27899a6 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java @@ -471,6 +471,12 @@ public class RestApiIntegrationTest extends AbstractSalesforceTestBase { } @Test + public void querySyncAsyncDoesntTimeout() throws Exception { + final Object result = template.requestBody("direct:querySyncAsync", ""); + assertNotNull(result); + } + + @Test public void testParentRelationshipQuery() throws Exception { try { createAccountAndContact(); @@ -752,6 +758,16 @@ public class RestApiIntegrationTest extends AbstractSalesforceTestBase { .to("salesforce:queryAll?sObjectQuery=SELECT name from Line_Item__c&sObjectClass=" + QueryRecordsLine_Item__c.class.getName() + "&format=" + format); + from("direct:querySyncAsync") + .to("direct:querySync") + .to("direct:queryAsync"); + + from("direct:querySync?synchronous=false").routeId("r.querySync") + .to("salesforce:query?rawPayload=true&sObjectQuery=Select Id From Contact Where Name = 'Sync'"); + + from("direct:queryAsync?synchronous=true").routeId("r.queryAsync") + .to("salesforce:query?rawPayload=true&sObjectQuery=Select Id From Contact Where Name = 'Sync'"); + // testSearch from("direct:search").to("salesforce:search?sObjectSearch=FIND {Wee}&format=" + format); diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java index 83df29d..af35dfe 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java @@ -19,6 +19,7 @@ package org.apache.camel.component.salesforce.internal.client; import java.io.InputStream; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -149,6 +150,9 @@ public class AbstractClientBaseTest { when(conversation.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE)) .thenReturn(salesforceRequest); + final ExecutorService executor = mock(ExecutorService.class); + when(client.httpClient.getWorkerPool()).thenReturn(executor); + // completes the request listener.getValue().onComplete(result); diff --git a/docs/components/modules/ROOT/pages/salesforce-component.adoc b/docs/components/modules/ROOT/pages/salesforce-component.adoc index 1a19e43..bdae45f 100644 --- a/docs/components/modules/ROOT/pages/salesforce-component.adoc +++ b/docs/components/modules/ROOT/pages/salesforce-component.adoc @@ -826,7 +826,7 @@ for details on how to generate the DTO. == Options // component options: START -The Salesforce component supports 84 options, which are listed below. +The Salesforce component supports 86 options, which are listed below. @@ -883,6 +883,8 @@ The Salesforce component supports 84 options, which are listed below. | *config* (common) | Global endpoint configuration - use to set values that are common to all endpoints | | SalesforceEndpointConfig | *httpClientProperties* (common) | Used to set any properties that can be configured on the underlying HTTP client. Have a look at properties of SalesforceHttpClient and the Jetty HttpClient for all available options. | | Map | *longPollingTransportProperties* (common) | Used to set any properties that can be configured on the LongPollingTransport used by the BayeuxClient (CometD) used by the streaming api | | Map +| *workerPoolMaxSize* (common) | Maximum size of the thread pool used to handle HTTP responses. | 20 | int +| *workerPoolSize* (common) | Size of the thread pool used to handle HTTP responses. | 10 | int | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *allOrNone* (producer) | Composite API option to indicate to rollback all records if any are not successful. | false | boolean | *apexUrl* (producer) | APEX method URL | | String