Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 96e9c8f05 -> 532de5b92
  refs/heads/master 6bce5cd97 -> bf93058b3


Fix re-login on auth token expiration for Bulk API

 For Bulk API the auth token expired state is reported by Salesforce using a 
400 HTTP status code and an Error with `InvalidSessionId` code (see 
https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_reference_errors.htm).


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/30ce39ff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/30ce39ff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/30ce39ff

Branch: refs/heads/master
Commit: 30ce39ff694048f0ae6428577b91f24f26c707dd
Parents: 6bce5cd
Author: pingw33n <pingw...@emphased.net>
Authored: Thu Feb 4 11:21:20 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Feb 5 09:37:53 2016 +0100

----------------------------------------------------------------------
 .../internal/client/AbstractClientBase.java     | 70 ++++++++++++--------
 .../client/SalesforceSecurityListener.java      | 34 ++++++++++
 .../salesforce/BulkApiIntegrationTest.java      | 61 +++++++++++++++++
 3 files changed, 138 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/30ce39ff/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index b0c7442..b61c161 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -103,11 +103,46 @@ public abstract class AbstractClientBase implements 
SalesforceSession.Salesforce
 
     protected void doHttpRequest(final ContentExchange request, final 
ClientResponseCallback callback) {
 
+        // use SalesforceSecurityListener for security login retries
+        final SalesforceSecurityListener securityListener;
+        try {
+            final boolean isHttps = 
HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme()));
+            securityListener = new SalesforceSecurityListener(
+                    httpClient.getDestination(request.getAddress(), isHttps),
+                    request, session, accessToken) {
+
+                private String reason;
+
+                @Override
+                public void onResponseStatus(Buffer version, int status, 
Buffer reason) throws IOException {
+                    super.onResponseStatus(version, status, reason);
+                    // remember status reason
+                    this.reason = reason.toString(StringUtil.__ISO_8859_1);
+                }
+
+                @Override
+                protected SalesforceException createExceptionResponse() {
+                    final int responseStatus = request.getResponseStatus();
+                    if (responseStatus < HttpStatus.OK_200 || responseStatus 
>= HttpStatus.MULTIPLE_CHOICES_300) {
+                        final String msg = String.format("Error {%s:%s} 
executing {%s:%s}",
+                                responseStatus, reason, request.getMethod(), 
request.getRequestURI());
+                        return new SalesforceException(msg, responseStatus, 
createRestException(request, reason));
+                    } else {
+                        return super.createExceptionResponse();
+                    }
+                }
+            };
+        } catch (IOException e) {
+            // propagate exception
+            callback.onResponse(null, new SalesforceException(
+                    String.format("Error registering security listener: %s", 
e.getMessage()),
+                    e));
+            return;
+        }
+
         // use HttpEventListener for lifecycle events
         request.setEventListener(new 
HttpEventListenerWrapper(request.getEventListener(), true) {
 
-            public String reason;
-
             @Override
             public void onConnectionFailed(Throwable ex) {
                 super.onConnectionFailed(ex);
@@ -133,12 +168,9 @@ public abstract class AbstractClientBase implements 
SalesforceSession.Salesforce
             public void onResponseComplete() throws IOException {
                 super.onResponseComplete();
 
-                final int responseStatus = request.getResponseStatus();
-                if (responseStatus < HttpStatus.OK_200 || responseStatus >= 
HttpStatus.MULTIPLE_CHOICES_300) {
-                    final String msg = String.format("Error {%s:%s} executing 
{%s:%s}",
-                            responseStatus, reason, request.getMethod(), 
request.getRequestURI());
-                    final SalesforceException exception = new 
SalesforceException(msg, responseStatus, createRestException(request, reason));
-                    callback.onResponse(null, exception);
+                SalesforceException e = 
securityListener.getExceptionResponse();
+                if (e != null) {
+                    callback.onResponse(null, e);
                 } else {
                     // TODO not memory efficient for large response messages,
                     // doesn't seem to be possible in Jetty 7 to directly 
stream to response parsers
@@ -147,27 +179,11 @@ public abstract class AbstractClientBase implements 
SalesforceSession.Salesforce
                 }
 
             }
-
-            @Override
-            public void onResponseStatus(Buffer version, int status, Buffer 
reason) throws IOException {
-                super.onResponseStatus(version, status, reason);
-                // remember status reason
-                this.reason = reason.toString(StringUtil.__ISO_8859_1);
-            }
         });
 
-        // use SalesforceSecurityListener for security login retries
-        try {
-            final boolean isHttps = 
HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme()));
-            request.setEventListener(new SalesforceSecurityListener(
-                    httpClient.getDestination(request.getAddress(), isHttps),
-                    request, session, accessToken));
-        } catch (IOException e) {
-            // propagate exception
-            callback.onResponse(null, new SalesforceException(
-                    String.format("Error registering security listener: %s", 
e.getMessage()),
-                    e));
-        }
+        // wrap the above lifecycle event listener with 
SalesforceSecurityListener
+        securityListener.setEventListener(request.getEventListener());
+        request.setEventListener(securityListener);
 
         // execute the request
         try {

http://git-wip-us.apache.org/repos/asf/camel/blob/30ce39ff/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
index 27d838a..2ef9e98 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
@@ -42,6 +42,7 @@ public class SalesforceSecurityListener extends 
HttpEventListenerWrapper {
     private boolean retrying;
     private boolean requestComplete;
     private boolean responseComplete;
+    private SalesforceException exceptionResponse;
 
     public SalesforceSecurityListener(HttpDestination destination, 
HttpExchange exchange,
                                       SalesforceSession session, String 
accessToken) {
@@ -75,17 +76,46 @@ public class SalesforceSecurityListener extends 
HttpEventListenerWrapper {
     @Override
     public void onResponseComplete() throws IOException {
         responseComplete = true;
+
+        exceptionResponse = createExceptionResponse();
+        if (!retrying && exceptionResponse != null && 
isInvalidSessionError(exceptionResponse)) {
+            if (LOG.isWarnEnabled()) {
+                LOG.warn("Retrying on Salesforce InvalidSessionId error: {}",
+                        
getRootSalesforceException(exceptionResponse).getMessage());
+            }
+            retrying = true;
+        }
+
         if (checkExchangeComplete()) {
             super.onResponseComplete();
         }
     }
 
+    private boolean isInvalidSessionError(SalesforceException e) {
+        e = getRootSalesforceException(e);
+        return e.getErrors() != null &&
+                    e.getErrors().size() == 1 &&
+                    
"InvalidSessionId".equals(e.getErrors().get(0).getErrorCode());
+    }
+
+    private SalesforceException getRootSalesforceException(SalesforceException 
e) {
+        while (e.getCause() instanceof SalesforceException) {
+            e = (SalesforceException) e.getCause();
+        }
+        return e;
+    }
+
+    protected SalesforceException createExceptionResponse() {
+        return null;
+    }
+
     private boolean checkExchangeComplete() throws IOException {
         if (retrying && requestComplete && responseComplete) {
             LOG.debug("Authentication Error, retrying: {}", exchange);
 
             requestComplete = false;
             responseComplete = false;
+            exceptionResponse = null;
 
             setDelegatingRequests(true);
             setDelegatingResponses(true);
@@ -137,6 +167,7 @@ public class SalesforceSecurityListener extends 
HttpEventListenerWrapper {
 
             requestComplete = false;
             responseComplete = false;
+            exceptionResponse = null;
         }
         super.onRetry();
     }
@@ -157,4 +188,7 @@ public class SalesforceSecurityListener extends 
HttpEventListenerWrapper {
         super.onException(ex);
     }
 
+    public SalesforceException getExceptionResponse() {
+        return exceptionResponse;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/30ce39ff/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java
new file mode 100644
index 0000000..b8c6dfe
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.component.salesforce.api.dto.bulk.ContentType;
+import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo;
+import org.apache.camel.component.salesforce.api.dto.bulk.OperationEnum;
+import org.apache.camel.component.salesforce.dto.generated.Merchandise__c;
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.RedirectListener;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Test;
+
+public class BulkApiIntegrationTest extends AbstractBulkApiTestBase {
+
+    @Test
+    public void testRetry() throws Exception {
+        SalesforceComponent sf = context().getComponent("salesforce", 
SalesforceComponent.class);
+        String accessToken = sf.getSession().getAccessToken();
+
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        sslContextFactory.setSslContext(new 
SSLContextParameters().createSSLContext());
+        HttpClient httpClient = new HttpClient(sslContextFactory);
+        httpClient.setConnectTimeout(60000);
+        httpClient.setTimeout(60000);
+        httpClient.registerListener(RedirectListener.class.getName());
+        httpClient.start();
+
+        ContentExchange logoutGet = new ContentExchange(true);
+        logoutGet.setURL(sf.getLoginConfig().getLoginUrl() + 
"/services/oauth2/revoke?token=" + accessToken);
+        logoutGet.setMethod(HttpMethods.GET);
+        httpClient.send(logoutGet);
+        assertEquals(HttpExchange.STATUS_COMPLETED, logoutGet.waitForDone());
+        assertEquals(HttpStatus.OK_200, logoutGet.getResponseStatus());
+
+        JobInfo jobInfo = new JobInfo();
+        jobInfo.setOperation(OperationEnum.INSERT);
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        createJob(jobInfo);
+    }
+}
\ No newline at end of file

Reply via email to