This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 38346ba  CAMEL-16877: camel-salesforce: Fix timeouts...
38346ba is described below

commit 38346ba49fb4f1dcb5fec138a622f83b908b6602
Author: Jeremy Ross <[email protected]>
AuthorDate: Wed Sep 15 11:05:55 2021 -0500

    CAMEL-16877: camel-salesforce: Fix timeouts...
    
    occurring in synchronous/transacted routes.
---
 .../component/salesforce/SalesforceComponent.java  | 37 +++++++++++++++++++---
 .../component/salesforce/SalesforceHttpClient.java | 34 ++++++++++++++++----
 .../internal/client/AbstractClientBase.java        | 20 +++++++-----
 .../salesforce/RestApiIntegrationTest.java         | 16 ++++++++++
 .../internal/client/AbstractClientBaseTest.java    |  4 +++
 5 files changed, 92 insertions(+), 19 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index a4fcd33..855194e 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -188,6 +188,13 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
               label = "common,advanced")
     private Map<String, Object> httpClientProperties;
 
+    @Metadata(description = "Size of the thread pool used to handle HTTP 
responses.",
+              label = "common,advanced", defaultValue = "10")
+    private int workerPoolSize = 10;
+    @Metadata(description = "Maximum size of the thread pool used to handle 
HTTP responses.",
+              label = "common,advanced", defaultValue = "20")
+    private int workerPoolMaxSize = 20;
+
     @Metadata(description = "Used to set any properties that can be configured 
on the LongPollingTransport used by the"
                             + " BayeuxClient (CometD) used by the streaming 
api",
               label = "common,advanced")
@@ -389,7 +396,7 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
             final SslContextFactory sslContextFactory = new 
SslContextFactory();
             
sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext()));
 
-            httpClient = createHttpClient(sslContextFactory);
+            httpClient = createHttpClient(this, sslContextFactory, 
getCamelContext(), workerPoolSize, workerPoolMaxSize);
             if (config != null) {
                 config.setHttpClient(httpClient);
             }
@@ -730,6 +737,22 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
         this.httpProxyUseDigestAuth = httpProxyUseDigestAuth;
     }
 
+    public int getWorkerPoolSize() {
+        return workerPoolSize;
+    }
+
+    public void setWorkerPoolSize(int workerPoolSize) {
+        this.workerPoolSize = workerPoolSize;
+    }
+
+    public int getWorkerPoolMaxSize() {
+        return workerPoolMaxSize;
+    }
+
+    public void setWorkerPoolMaxSize(int workerPoolMaxSize) {
+        this.workerPoolMaxSize = workerPoolMaxSize;
+    }
+
     public String getPackages() {
         return packages;
     }
@@ -795,7 +818,8 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
         final SslContextFactory sslContextFactory = new SslContextFactory();
         
sslContextFactory.setSslContext(sslContextParameters.createSSLContext(camelContext));
 
-        final SalesforceHttpClient httpClient = 
createHttpClient(sslContextFactory);
+        final SalesforceHttpClient httpClient
+                = createHttpClient("SalesforceComponent", sslContextFactory, 
camelContext, 10, 20);
         setupHttpClient(httpClient, camelContext, properties);
 
         final SalesforceSession session = new SalesforceSession(camelContext, 
httpClient, httpClient.getTimeout(), loginConfig);
@@ -808,10 +832,15 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
         return new DefaultRawClient(httpClient, "", session, loginConfig);
     }
 
-    static SalesforceHttpClient createHttpClient(final SslContextFactory 
sslContextFactory) {
+    static SalesforceHttpClient createHttpClient(
+            Object source, final SslContextFactory sslContextFactory, final 
CamelContext context, int workerPoolSize,
+            int workerPoolMaxSize) {
         SecurityUtils.adaptToIBMCipherNames(sslContextFactory);
 
-        final SalesforceHttpClient httpClient = new 
SalesforceHttpClient(sslContextFactory);
+        final SalesforceHttpClient httpClient = new SalesforceHttpClient(
+                context, 
context.getExecutorServiceManager().newThreadPool(source, 
"SalesforceHttpClient", workerPoolSize,
+                        workerPoolMaxSize),
+                sslContextFactory);
         // default settings, use httpClientProperties to set other
         // properties
         httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
index 0fce4bb..e12403c 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -19,13 +19,15 @@ package org.apache.camel.component.salesforce;
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import 
org.apache.camel.component.salesforce.internal.client.SalesforceHttpRequest;
 import 
org.apache.camel.component.salesforce.internal.client.SalesforceSecurityHandler;
 import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpClientTransport;
 import org.eclipse.jetty.client.HttpConversation;
 import org.eclipse.jetty.client.HttpRequest;
 import org.eclipse.jetty.client.ProtocolHandler;
@@ -33,8 +35,6 @@ import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
-import static java.util.Optional.ofNullable;
-
 /**
  * Custom Salesforce HTTP Client that creates {@link SalesforceHttpRequest} 
requests.
  */
@@ -45,6 +45,7 @@ public class SalesforceHttpClient extends HttpClient {
 
     private static final int DEFAULT_MAX_RETRIES = 3;
     private static final int DEFAULT_MAX_CONTENT_LENGTH = 4 * 1024 * 1024;
+    private final CamelContext camelContext;
 
     private SalesforceSession session;
     private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -52,19 +53,22 @@ public class SalesforceHttpClient extends HttpClient {
     private long timeout = DEFAULT_TIMEOUT;
 
     private final Method addProtocolHandlerMethod;
-
     private final Method getProtocolHandlersMethod;
 
+    private final ExecutorService workerPool;
+
     public SalesforceHttpClient() {
         this(null);
     }
 
     public SalesforceHttpClient(SslContextFactory sslContextFactory) {
-        this(null, sslContextFactory);
+        this(null, Executors.newCachedThreadPool(), sslContextFactory);
     }
 
-    public SalesforceHttpClient(HttpClientTransport transport, 
SslContextFactory sslContextFactory) {
-        super(ofNullable(transport).orElse(new HttpClientTransportOverHTTP()), 
sslContextFactory);
+    public SalesforceHttpClient(CamelContext context, ExecutorService 
workerPool, SslContextFactory sslContextFactory) {
+        super(new HttpClientTransportOverHTTP(), sslContextFactory);
+        this.workerPool = workerPool;
+        this.camelContext = context;
 
         // Jetty 9.3, as opposed to 9.2 the way to add ProtocolHandler to
         // HttpClient changed in 9.2 HttpClient::getProtocolHandlers returned
@@ -111,6 +115,18 @@ public class SalesforceHttpClient extends HttpClient {
         super.doStart();
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        if (workerPool != null) {
+            if (camelContext != null) {
+                
camelContext.getExecutorServiceManager().shutdownGraceful(workerPool);
+            } else {
+                workerPool.shutdown();
+            }
+        }
+        super.doStop();
+    }
+
     public SalesforceSession getSession() {
         return session;
     }
@@ -142,4 +158,8 @@ public class SalesforceHttpClient extends HttpClient {
     public void setTimeout(long timeout) {
         this.timeout = timeout;
     }
+
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index 07d95cd..1b9b221 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -200,12 +200,14 @@ public abstract class AbstractClientBase extends 
ServiceSupport
                         // from SalesforceSecurityHandler
                         Throwable failure = result.getFailure();
                         if (failure instanceof SalesforceException) {
-                            callback.onResponse(null, headers, 
(SalesforceException) failure);
+                            httpClient.getWorkerPool()
+                                    .execute(() -> callback.onResponse(null, 
headers, (SalesforceException) failure));
                         } else {
                             final String msg = String.format("Unexpected error 
{%s:%s} executing {%s:%s}", response.getStatus(),
                                     response.getReason(), request.getMethod(),
                                     request.getURI());
-                            callback.onResponse(null, headers, new 
SalesforceException(msg, response.getStatus(), failure));
+                            httpClient.getWorkerPool().execute(() -> 
callback.onResponse(null, headers,
+                                    new SalesforceException(msg, 
response.getStatus(), failure)));
                         }
                     } else {
 
@@ -224,14 +226,14 @@ public abstract class AbstractClientBase extends 
ServiceSupport
                                 session.parseLoginResponse(contentResponse, 
getContentAsString());
                                 final String msg = String.format("Unexpected 
Error {%s:%s} executing {%s:%s}", status,
                                         response.getReason(), 
request.getMethod(), request.getURI());
-                                callback.onResponse(null, headers, new 
SalesforceException(msg, null));
-
+                                httpClient.getWorkerPool()
+                                        .execute(() -> 
callback.onResponse(null, headers, new SalesforceException(msg, null)));
                             } catch (SalesforceException e) {
 
                                 final String msg = String.format("Error 
{%s:%s} executing {%s:%s}", status,
                                         response.getReason(), 
request.getMethod(), request.getURI());
-                                callback.onResponse(null, headers, new 
SalesforceException(msg, response.getStatus(), e));
-
+                                httpClient.getWorkerPool().execute(() -> 
callback.onResponse(null, headers,
+                                        new SalesforceException(msg, 
response.getStatus(), e)));
                             }
                         } else if (status < HttpStatus.OK_200 || status >= 
HttpStatus.MULTIPLE_CHOICES_300) {
                             // Salesforce HTTP failure!
@@ -239,11 +241,13 @@ public abstract class AbstractClientBase extends 
ServiceSupport
 
                             // for APIs that return body on status 400, such as
                             // Composite API we need content as well
-                            callback.onResponse(getContentAsInputStream(), 
headers, exception);
+                            httpClient.getWorkerPool()
+                                    .execute(() -> 
callback.onResponse(getContentAsInputStream(), headers, exception));
                         } else {
 
                             // Success!!!
-                            callback.onResponse(getContentAsInputStream(), 
headers, null);
+                            httpClient.getWorkerPool()
+                                    .execute(() -> 
callback.onResponse(getContentAsInputStream(), headers, null));
                         }
                     }
                 } finally {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
index 68dd5b9..27899a6 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
@@ -471,6 +471,12 @@ public class RestApiIntegrationTest extends 
AbstractSalesforceTestBase {
     }
 
     @Test
+    public void querySyncAsyncDoesntTimeout() throws Exception {
+        final Object result = template.requestBody("direct:querySyncAsync", 
"");
+        assertNotNull(result);
+    }
+
+    @Test
     public void testParentRelationshipQuery() throws Exception {
         try {
             createAccountAndContact();
@@ -752,6 +758,16 @@ public class RestApiIntegrationTest extends 
AbstractSalesforceTestBase {
                         .to("salesforce:queryAll?sObjectQuery=SELECT name from 
Line_Item__c&sObjectClass="
                             + QueryRecordsLine_Item__c.class.getName() + 
"&format=" + format);
 
+                from("direct:querySyncAsync")
+                        .to("direct:querySync")
+                        .to("direct:queryAsync");
+
+                
from("direct:querySync?synchronous=false").routeId("r.querySync")
+                        
.to("salesforce:query?rawPayload=true&sObjectQuery=Select Id From Contact Where 
Name = 'Sync'");
+
+                
from("direct:queryAsync?synchronous=true").routeId("r.queryAsync")
+                        
.to("salesforce:query?rawPayload=true&sObjectQuery=Select Id From Contact  
Where Name = 'Sync'");
+
                 // testSearch
                 from("direct:search").to("salesforce:search?sObjectSearch=FIND 
{Wee}&format=" + format);
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
index 83df29d..af35dfe 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.salesforce.internal.client;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -149,6 +150,9 @@ public class AbstractClientBaseTest {
         
when(conversation.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE))
                 .thenReturn(salesforceRequest);
 
+        final ExecutorService executor = mock(ExecutorService.class);
+        when(client.httpClient.getWorkerPool()).thenReturn(executor);
+
         // completes the request
         listener.getValue().onComplete(result);
 

Reply via email to