This is an automated email from the ASF dual-hosted git repository.
jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 38346ba CAMEL-16877: camel-salesforce: Fix timeouts...
38346ba is described below
commit 38346ba49fb4f1dcb5fec138a622f83b908b6602
Author: Jeremy Ross <[email protected]>
AuthorDate: Wed Sep 15 11:05:55 2021 -0500
CAMEL-16877: camel-salesforce: Fix timeouts...
occurring in synchronous/transacted routes.
---
.../component/salesforce/SalesforceComponent.java | 37 +++++++++++++++++++---
.../component/salesforce/SalesforceHttpClient.java | 34 ++++++++++++++++----
.../internal/client/AbstractClientBase.java | 20 +++++++-----
.../salesforce/RestApiIntegrationTest.java | 16 ++++++++++
.../internal/client/AbstractClientBaseTest.java | 4 +++
5 files changed, 92 insertions(+), 19 deletions(-)
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index a4fcd33..855194e 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -188,6 +188,13 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
label = "common,advanced")
private Map<String, Object> httpClientProperties;
+ @Metadata(description = "Size of the thread pool used to handle HTTP
responses.",
+ label = "common,advanced", defaultValue = "10")
+ private int workerPoolSize = 10;
+ @Metadata(description = "Maximum size of the thread pool used to handle
HTTP responses.",
+ label = "common,advanced", defaultValue = "20")
+ private int workerPoolMaxSize = 20;
+
@Metadata(description = "Used to set any properties that can be configured
on the LongPollingTransport used by the"
+ " BayeuxClient (CometD) used by the streaming
api",
label = "common,advanced")
@@ -389,7 +396,7 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
final SslContextFactory sslContextFactory = new
SslContextFactory();
sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext()));
- httpClient = createHttpClient(sslContextFactory);
+ httpClient = createHttpClient(this, sslContextFactory,
getCamelContext(), workerPoolSize, workerPoolMaxSize);
if (config != null) {
config.setHttpClient(httpClient);
}
@@ -730,6 +737,22 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
this.httpProxyUseDigestAuth = httpProxyUseDigestAuth;
}
+ public int getWorkerPoolSize() {
+ return workerPoolSize;
+ }
+
+ public void setWorkerPoolSize(int workerPoolSize) {
+ this.workerPoolSize = workerPoolSize;
+ }
+
+ public int getWorkerPoolMaxSize() {
+ return workerPoolMaxSize;
+ }
+
+ public void setWorkerPoolMaxSize(int workerPoolMaxSize) {
+ this.workerPoolMaxSize = workerPoolMaxSize;
+ }
+
public String getPackages() {
return packages;
}
@@ -795,7 +818,8 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
final SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setSslContext(sslContextParameters.createSSLContext(camelContext));
- final SalesforceHttpClient httpClient =
createHttpClient(sslContextFactory);
+ final SalesforceHttpClient httpClient
+ = createHttpClient("SalesforceComponent", sslContextFactory,
camelContext, 10, 20);
setupHttpClient(httpClient, camelContext, properties);
final SalesforceSession session = new SalesforceSession(camelContext,
httpClient, httpClient.getTimeout(), loginConfig);
@@ -808,10 +832,15 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
return new DefaultRawClient(httpClient, "", session, loginConfig);
}
- static SalesforceHttpClient createHttpClient(final SslContextFactory
sslContextFactory) {
+ static SalesforceHttpClient createHttpClient(
+ Object source, final SslContextFactory sslContextFactory, final
CamelContext context, int workerPoolSize,
+ int workerPoolMaxSize) {
SecurityUtils.adaptToIBMCipherNames(sslContextFactory);
- final SalesforceHttpClient httpClient = new
SalesforceHttpClient(sslContextFactory);
+ final SalesforceHttpClient httpClient = new SalesforceHttpClient(
+ context,
context.getExecutorServiceManager().newThreadPool(source,
"SalesforceHttpClient", workerPoolSize,
+ workerPoolMaxSize),
+ sslContextFactory);
// default settings, use httpClientProperties to set other
// properties
httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
index 0fce4bb..e12403c 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -19,13 +19,15 @@ package org.apache.camel.component.salesforce;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelContext;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import
org.apache.camel.component.salesforce.internal.client.SalesforceHttpRequest;
import
org.apache.camel.component.salesforce.internal.client.SalesforceSecurityHandler;
import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.ProtocolHandler;
@@ -33,8 +35,6 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import static java.util.Optional.ofNullable;
-
/**
* Custom Salesforce HTTP Client that creates {@link SalesforceHttpRequest}
requests.
*/
@@ -45,6 +45,7 @@ public class SalesforceHttpClient extends HttpClient {
private static final int DEFAULT_MAX_RETRIES = 3;
private static final int DEFAULT_MAX_CONTENT_LENGTH = 4 * 1024 * 1024;
+ private final CamelContext camelContext;
private SalesforceSession session;
private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -52,19 +53,22 @@ public class SalesforceHttpClient extends HttpClient {
private long timeout = DEFAULT_TIMEOUT;
private final Method addProtocolHandlerMethod;
-
private final Method getProtocolHandlersMethod;
+ private final ExecutorService workerPool;
+
public SalesforceHttpClient() {
this(null);
}
public SalesforceHttpClient(SslContextFactory sslContextFactory) {
- this(null, sslContextFactory);
+ this(null, Executors.newCachedThreadPool(), sslContextFactory);
}
- public SalesforceHttpClient(HttpClientTransport transport,
SslContextFactory sslContextFactory) {
- super(ofNullable(transport).orElse(new HttpClientTransportOverHTTP()),
sslContextFactory);
+ public SalesforceHttpClient(CamelContext context, ExecutorService
workerPool, SslContextFactory sslContextFactory) {
+ super(new HttpClientTransportOverHTTP(), sslContextFactory);
+ this.workerPool = workerPool;
+ this.camelContext = context;
// Jetty 9.3, as opposed to 9.2 the way to add ProtocolHandler to
// HttpClient changed in 9.2 HttpClient::getProtocolHandlers returned
@@ -111,6 +115,18 @@ public class SalesforceHttpClient extends HttpClient {
super.doStart();
}
+ @Override
+ protected void doStop() throws Exception {
+ if (workerPool != null) {
+ if (camelContext != null) {
+
camelContext.getExecutorServiceManager().shutdownGraceful(workerPool);
+ } else {
+ workerPool.shutdown();
+ }
+ }
+ super.doStop();
+ }
+
public SalesforceSession getSession() {
return session;
}
@@ -142,4 +158,8 @@ public class SalesforceHttpClient extends HttpClient {
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index 07d95cd..1b9b221 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -200,12 +200,14 @@ public abstract class AbstractClientBase extends
ServiceSupport
// from SalesforceSecurityHandler
Throwable failure = result.getFailure();
if (failure instanceof SalesforceException) {
- callback.onResponse(null, headers,
(SalesforceException) failure);
+ httpClient.getWorkerPool()
+ .execute(() -> callback.onResponse(null,
headers, (SalesforceException) failure));
} else {
final String msg = String.format("Unexpected error
{%s:%s} executing {%s:%s}", response.getStatus(),
response.getReason(), request.getMethod(),
request.getURI());
- callback.onResponse(null, headers, new
SalesforceException(msg, response.getStatus(), failure));
+ httpClient.getWorkerPool().execute(() ->
callback.onResponse(null, headers,
+ new SalesforceException(msg,
response.getStatus(), failure)));
}
} else {
@@ -224,14 +226,14 @@ public abstract class AbstractClientBase extends
ServiceSupport
session.parseLoginResponse(contentResponse,
getContentAsString());
final String msg = String.format("Unexpected
Error {%s:%s} executing {%s:%s}", status,
response.getReason(),
request.getMethod(), request.getURI());
- callback.onResponse(null, headers, new
SalesforceException(msg, null));
-
+ httpClient.getWorkerPool()
+ .execute(() ->
callback.onResponse(null, headers, new SalesforceException(msg, null)));
} catch (SalesforceException e) {
final String msg = String.format("Error
{%s:%s} executing {%s:%s}", status,
response.getReason(),
request.getMethod(), request.getURI());
- callback.onResponse(null, headers, new
SalesforceException(msg, response.getStatus(), e));
-
+ httpClient.getWorkerPool().execute(() ->
callback.onResponse(null, headers,
+ new SalesforceException(msg,
response.getStatus(), e)));
}
} else if (status < HttpStatus.OK_200 || status >=
HttpStatus.MULTIPLE_CHOICES_300) {
// Salesforce HTTP failure!
@@ -239,11 +241,13 @@ public abstract class AbstractClientBase extends
ServiceSupport
// for APIs that return body on status 400, such as
// Composite API we need content as well
- callback.onResponse(getContentAsInputStream(),
headers, exception);
+ httpClient.getWorkerPool()
+ .execute(() ->
callback.onResponse(getContentAsInputStream(), headers, exception));
} else {
// Success!!!
- callback.onResponse(getContentAsInputStream(),
headers, null);
+ httpClient.getWorkerPool()
+ .execute(() ->
callback.onResponse(getContentAsInputStream(), headers, null));
}
}
} finally {
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
index 68dd5b9..27899a6 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
@@ -471,6 +471,12 @@ public class RestApiIntegrationTest extends
AbstractSalesforceTestBase {
}
@Test
+ public void querySyncAsyncDoesntTimeout() throws Exception {
+ final Object result = template.requestBody("direct:querySyncAsync",
"");
+ assertNotNull(result);
+ }
+
+ @Test
public void testParentRelationshipQuery() throws Exception {
try {
createAccountAndContact();
@@ -752,6 +758,16 @@ public class RestApiIntegrationTest extends
AbstractSalesforceTestBase {
.to("salesforce:queryAll?sObjectQuery=SELECT name from
Line_Item__c&sObjectClass="
+ QueryRecordsLine_Item__c.class.getName() +
"&format=" + format);
+ from("direct:querySyncAsync")
+ .to("direct:querySync")
+ .to("direct:queryAsync");
+
+
from("direct:querySync?synchronous=false").routeId("r.querySync")
+
.to("salesforce:query?rawPayload=true&sObjectQuery=Select Id From Contact Where
Name = 'Sync'");
+
+
from("direct:queryAsync?synchronous=true").routeId("r.queryAsync")
+
.to("salesforce:query?rawPayload=true&sObjectQuery=Select Id From Contact
Where Name = 'Sync'");
+
// testSearch
from("direct:search").to("salesforce:search?sObjectSearch=FIND
{Wee}&format=" + format);
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
index 83df29d..af35dfe 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.salesforce.internal.client;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -149,6 +150,9 @@ public class AbstractClientBaseTest {
when(conversation.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE))
.thenReturn(salesforceRequest);
+ final ExecutorService executor = mock(ExecutorService.class);
+ when(client.httpClient.getWorkerPool()).thenReturn(executor);
+
// completes the request
listener.getValue().onComplete(result);