Fix re-login on auth token expiration for Bulk API For Bulk API the auth token expired state is reported by Salesforce using a 400 HTTP status code and an Error with `InvalidSessionId` code (see https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_reference_errors.htm).
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81392e12 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81392e12 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81392e12 Branch: refs/heads/camel-2.16.x Commit: 81392e129ccf4d21c30de3d6a4602f3b57dfcf0c Parents: 96e9c8f Author: pingw33n <pingw...@emphased.net> Authored: Thu Feb 4 11:21:20 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Feb 5 09:54:56 2016 +0100 ---------------------------------------------------------------------- .../internal/client/AbstractClientBase.java | 70 ++++++++++++-------- .../client/SalesforceSecurityListener.java | 34 ++++++++++ .../salesforce/BulkApiIntegrationTest.java | 61 +++++++++++++++++ 3 files changed, 138 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/81392e12/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java index b0c7442..b61c161 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java @@ -103,11 +103,46 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce protected void doHttpRequest(final ContentExchange request, final ClientResponseCallback callback) { + // use SalesforceSecurityListener for security login retries + final SalesforceSecurityListener securityListener; + try { + final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme())); + securityListener = new SalesforceSecurityListener( + httpClient.getDestination(request.getAddress(), isHttps), + request, session, accessToken) { + + private String reason; + + @Override + public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { + super.onResponseStatus(version, status, reason); + // remember status reason + this.reason = reason.toString(StringUtil.__ISO_8859_1); + } + + @Override + protected SalesforceException createExceptionResponse() { + final int responseStatus = request.getResponseStatus(); + if (responseStatus < HttpStatus.OK_200 || responseStatus >= HttpStatus.MULTIPLE_CHOICES_300) { + final String msg = String.format("Error {%s:%s} executing {%s:%s}", + responseStatus, reason, request.getMethod(), request.getRequestURI()); + return new SalesforceException(msg, responseStatus, createRestException(request, reason)); + } else { + return super.createExceptionResponse(); + } + } + }; + } catch (IOException e) { + // propagate exception + callback.onResponse(null, new SalesforceException( + String.format("Error registering security listener: %s", e.getMessage()), + e)); + return; + } + // use HttpEventListener for lifecycle events request.setEventListener(new HttpEventListenerWrapper(request.getEventListener(), true) { - public String reason; - @Override public void onConnectionFailed(Throwable ex) { super.onConnectionFailed(ex); @@ -133,12 +168,9 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce public void onResponseComplete() throws IOException { super.onResponseComplete(); - final int responseStatus = request.getResponseStatus(); - if (responseStatus < HttpStatus.OK_200 || responseStatus >= HttpStatus.MULTIPLE_CHOICES_300) { - final String msg = String.format("Error {%s:%s} executing {%s:%s}", - responseStatus, reason, request.getMethod(), request.getRequestURI()); - final SalesforceException exception = new SalesforceException(msg, responseStatus, createRestException(request, reason)); - callback.onResponse(null, exception); + SalesforceException e = securityListener.getExceptionResponse(); + if (e != null) { + callback.onResponse(null, e); } else { // TODO not memory efficient for large response messages, // doesn't seem to be possible in Jetty 7 to directly stream to response parsers @@ -147,27 +179,11 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce } } - - @Override - public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { - super.onResponseStatus(version, status, reason); - // remember status reason - this.reason = reason.toString(StringUtil.__ISO_8859_1); - } }); - // use SalesforceSecurityListener for security login retries - try { - final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme())); - request.setEventListener(new SalesforceSecurityListener( - httpClient.getDestination(request.getAddress(), isHttps), - request, session, accessToken)); - } catch (IOException e) { - // propagate exception - callback.onResponse(null, new SalesforceException( - String.format("Error registering security listener: %s", e.getMessage()), - e)); - } + // wrap the above lifecycle event listener with SalesforceSecurityListener + securityListener.setEventListener(request.getEventListener()); + request.setEventListener(securityListener); // execute the request try { http://git-wip-us.apache.org/repos/asf/camel/blob/81392e12/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java index 27d838a..2ef9e98 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java @@ -42,6 +42,7 @@ public class SalesforceSecurityListener extends HttpEventListenerWrapper { private boolean retrying; private boolean requestComplete; private boolean responseComplete; + private SalesforceException exceptionResponse; public SalesforceSecurityListener(HttpDestination destination, HttpExchange exchange, SalesforceSession session, String accessToken) { @@ -75,17 +76,46 @@ public class SalesforceSecurityListener extends HttpEventListenerWrapper { @Override public void onResponseComplete() throws IOException { responseComplete = true; + + exceptionResponse = createExceptionResponse(); + if (!retrying && exceptionResponse != null && isInvalidSessionError(exceptionResponse)) { + if (LOG.isWarnEnabled()) { + LOG.warn("Retrying on Salesforce InvalidSessionId error: {}", + getRootSalesforceException(exceptionResponse).getMessage()); + } + retrying = true; + } + if (checkExchangeComplete()) { super.onResponseComplete(); } } + private boolean isInvalidSessionError(SalesforceException e) { + e = getRootSalesforceException(e); + return e.getErrors() != null && + e.getErrors().size() == 1 && + "InvalidSessionId".equals(e.getErrors().get(0).getErrorCode()); + } + + private SalesforceException getRootSalesforceException(SalesforceException e) { + while (e.getCause() instanceof SalesforceException) { + e = (SalesforceException) e.getCause(); + } + return e; + } + + protected SalesforceException createExceptionResponse() { + return null; + } + private boolean checkExchangeComplete() throws IOException { if (retrying && requestComplete && responseComplete) { LOG.debug("Authentication Error, retrying: {}", exchange); requestComplete = false; responseComplete = false; + exceptionResponse = null; setDelegatingRequests(true); setDelegatingResponses(true); @@ -137,6 +167,7 @@ public class SalesforceSecurityListener extends HttpEventListenerWrapper { requestComplete = false; responseComplete = false; + exceptionResponse = null; } super.onRetry(); } @@ -157,4 +188,7 @@ public class SalesforceSecurityListener extends HttpEventListenerWrapper { super.onException(ex); } + public SalesforceException getExceptionResponse() { + return exceptionResponse; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/81392e12/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java new file mode 100644 index 0000000..b8c6dfe --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiIntegrationTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce; + +import org.apache.camel.component.salesforce.api.dto.bulk.ContentType; +import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo; +import org.apache.camel.component.salesforce.api.dto.bulk.OperationEnum; +import org.apache.camel.component.salesforce.dto.generated.Merchandise__c; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.RedirectListener; +import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.Test; + +public class BulkApiIntegrationTest extends AbstractBulkApiTestBase { + + @Test + public void testRetry() throws Exception { + SalesforceComponent sf = context().getComponent("salesforce", SalesforceComponent.class); + String accessToken = sf.getSession().getAccessToken(); + + SslContextFactory sslContextFactory = new SslContextFactory(); + sslContextFactory.setSslContext(new SSLContextParameters().createSSLContext()); + HttpClient httpClient = new HttpClient(sslContextFactory); + httpClient.setConnectTimeout(60000); + httpClient.setTimeout(60000); + httpClient.registerListener(RedirectListener.class.getName()); + httpClient.start(); + + ContentExchange logoutGet = new ContentExchange(true); + logoutGet.setURL(sf.getLoginConfig().getLoginUrl() + "/services/oauth2/revoke?token=" + accessToken); + logoutGet.setMethod(HttpMethods.GET); + httpClient.send(logoutGet); + assertEquals(HttpExchange.STATUS_COMPLETED, logoutGet.waitForDone()); + assertEquals(HttpStatus.OK_200, logoutGet.getResponseStatus()); + + JobInfo jobInfo = new JobInfo(); + jobInfo.setOperation(OperationEnum.INSERT); + jobInfo.setContentType(ContentType.CSV); + jobInfo.setObject(Merchandise__c.class.getSimpleName()); + createJob(jobInfo); + } +} \ No newline at end of file