http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java new file mode 100644 index 0000000..a16a2a3 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal; + +import org.apache.camel.Service; +import org.codehaus.jackson.map.ObjectMapper; +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.UrlEncoded; +import org.apache.camel.component.salesforce.SalesforceLoginConfig; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.RestError; +import org.apache.camel.component.salesforce.internal.dto.LoginError; +import org.apache.camel.component.salesforce.internal.dto.LoginToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; + +public class SalesforceSession implements Service { + + private static final String OAUTH2_REVOKE_PATH = "/services/oauth2/revoke?token="; + private static final String OAUTH2_TOKEN_PATH = "/services/oauth2/token"; + + private static final Logger LOG = LoggerFactory.getLogger(SalesforceSession.class); + private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded;charset=utf-8"; + + private final HttpClient httpClient; + + private final SalesforceLoginConfig config; + + private final ObjectMapper objectMapper; + private final Set<SalesforceSessionListener> listeners; + + private String accessToken; + private String instanceUrl; + + public SalesforceSession(HttpClient httpClient, SalesforceLoginConfig config) { + // validate parameters + assertNotNull("Null httpClient", httpClient); + assertNotNull("Null SalesforceLoginConfig", config); + assertNotNull("Null loginUrl", config.getLoginUrl()); + assertNotNull("Null clientId", config.getClientId()); + assertNotNull("Null clientSecret", config.getClientSecret()); + assertNotNull("Null userName", config.getUserName()); + assertNotNull("Null password", config.getPassword()); + + this.httpClient = httpClient; + this.config = config; + + // strip trailing '/' + String loginUrl = config.getLoginUrl(); + config.setLoginUrl(loginUrl.endsWith("/") ? loginUrl.substring(0, loginUrl.length() - 1) : loginUrl); + + this.objectMapper = new ObjectMapper(); + this.listeners = new CopyOnWriteArraySet<SalesforceSessionListener>(); + } + + private void assertNotNull(String s, Object o) { + if (o == null) { + throw new IllegalArgumentException(s); + } + } + + @SuppressWarnings("unchecked") + public synchronized String login(String oldToken) throws SalesforceException { + + // check if we need a new session + // this way there's always a single valid session + if ((accessToken == null) || accessToken.equals(oldToken)) { + + // try revoking the old access token before creating a new one + accessToken = oldToken; + if (accessToken != null) { + try { + logout(); + } catch (SalesforceException e) { + LOG.warn("Error revoking old access token: " + e.getMessage(), e); + } + accessToken = null; + } + + // login to Salesforce and get session id + final StatusExceptionExchange loginPost = new StatusExceptionExchange(true); + loginPost.setURL(config.getLoginUrl() + OAUTH2_TOKEN_PATH); + loginPost.setMethod(HttpMethods.POST); + loginPost.setRequestContentType(FORM_CONTENT_TYPE); + + final UrlEncoded nvps = new UrlEncoded(); + nvps.put("grant_type", "password"); + nvps.put("client_id", config.getClientId()); + nvps.put("client_secret", config.getClientSecret()); + nvps.put("username", config.getUserName()); + nvps.put("password", config.getPassword()); + nvps.put("format", "json"); + + try { + + // set form content + loginPost.setRequestContent(new ByteArrayBuffer( + nvps.encode(StringUtil.__UTF8, true).getBytes(StringUtil.__UTF8))); + httpClient.send(loginPost); + + // wait for the login to finish + final int exchangeState = loginPost.waitForDone(); + + switch (exchangeState) { + case HttpExchange.STATUS_COMPLETED: + final byte[] responseContent = loginPost.getResponseContentBytes(); + final int responseStatus = loginPost.getResponseStatus(); + switch (responseStatus) { + + case HttpStatus.OK_200: + // parse the response to get token + LoginToken token = objectMapper.readValue(responseContent, + LoginToken.class); + + // don't log token or instance URL for security reasons + LOG.info("Login successful"); + accessToken = token.getAccessToken(); + instanceUrl = token.getInstanceUrl(); + + // notify all listeners + for (SalesforceSessionListener listener : listeners) { + try { + listener.onLogin(accessToken, instanceUrl); + } catch (Throwable t) { + LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage()); + } + } + + break; + + case HttpStatus.BAD_REQUEST_400: + // parse the response to get error + final LoginError error = objectMapper.readValue(responseContent, + LoginError.class); + final String msg = String.format("Login error code:[%s] description:[%s]", + error.getError(), error.getErrorDescription()); + final List<RestError> errors = new ArrayList<RestError>(); + errors.add(new RestError(msg, error.getErrorDescription())); + throw new SalesforceException(errors, HttpStatus.BAD_REQUEST_400); + + default: + throw new SalesforceException( + String.format("Login error status:[%s] reason:[%s]", + responseStatus, loginPost.getReason()), + responseStatus); + } + break; + + case HttpExchange.STATUS_EXCEPTED: + final Throwable ex = loginPost.getException(); + throw new SalesforceException( + String.format("Unexpected login exception: %s", ex.getMessage()), + ex); + + case HttpExchange.STATUS_CANCELLED: + throw new SalesforceException("Login request CANCELLED!", null); + + case HttpExchange.STATUS_EXPIRED: + throw new SalesforceException("Login request TIMEOUT!", null); + + } + + } catch (IOException e) { + String msg = "Login error: unexpected exception " + e.getMessage(); + throw new SalesforceException(msg, e); + } catch (InterruptedException e) { + String msg = "Login error: unexpected exception " + e.getMessage(); + throw new SalesforceException(msg, e); + } + } + + return accessToken; + } + + public void logout() throws SalesforceException { + if (accessToken == null) { + return; + } + + StatusExceptionExchange logoutGet = new StatusExceptionExchange(true); + logoutGet.setURL(config.getLoginUrl() + OAUTH2_REVOKE_PATH + accessToken); + logoutGet.setMethod(HttpMethods.GET); + + try { + httpClient.send(logoutGet); + final int done = logoutGet.waitForDone(); + switch (done) { + + case HttpExchange.STATUS_COMPLETED: + final int statusCode = logoutGet.getResponseStatus(); + final String reason = logoutGet.getReason(); + + if (statusCode == HttpStatus.OK_200) { + LOG.info("Logout successful"); + } else { + throw new SalesforceException( + String.format("Logout error, code: [%s] reason: [%s]", + statusCode, reason), + statusCode); + } + break; + + case HttpExchange.STATUS_EXCEPTED: + final Throwable ex = logoutGet.getException(); + throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex); + + case HttpExchange.STATUS_CANCELLED: + throw new SalesforceException("Logout request CANCELLED!", null); + + case HttpExchange.STATUS_EXPIRED: + throw new SalesforceException("Logout request TIMEOUT!", null); + + } + + } catch (SalesforceException e) { + throw e; + } catch (Exception e) { + String msg = "Logout error: " + e.getMessage(); + throw new SalesforceException(msg, e); + } finally { + // reset session + accessToken = null; + instanceUrl = null; + // notify all session listeners of the new access token and instance url + for (SalesforceSessionListener listener : listeners) { + try { + listener.onLogout(); + } catch (Throwable t) { + LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage()); + } + } + } + } + + public String getAccessToken() { + return accessToken; + } + + public String getInstanceUrl() { + return instanceUrl; + } + + public boolean addListener(SalesforceSessionListener listener) { + return listeners.add(listener); + } + + public boolean removeListener(SalesforceSessionListener listener) { + return listeners.remove(listener); + } + + @Override + public void start() throws Exception { + // auto-login at start if needed + login(accessToken); + } + + @Override + public void stop() throws Exception { + // logout + logout(); + } + + /** + * Records status line, and exception from exchange. + * + * @author dbokde + */ + private static class StatusExceptionExchange extends ContentExchange { + + private String reason; + private Throwable exception; + + public StatusExceptionExchange(boolean cacheFields) { + super(cacheFields); + } + + @Override + protected synchronized void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { + // remember reason + this.reason = reason.toString(StringUtil.__ISO_8859_1); + super.onResponseStatus(version, status, reason); + } + + @Override + protected void onConnectionFailed(Throwable x) { + this.exception = x; + super.onConnectionFailed(x); + } + + @Override + protected void onException(Throwable x) { + this.exception = x; + super.onException(x); + } + + public String getReason() { + return reason; + } + + public Throwable getException() { + return exception; + } + + } + + public static interface SalesforceSessionListener { + void onLogin(String accessToken, String instanceUrl); + void onLogout(); + } + +}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java new file mode 100644 index 0000000..6fe7028 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.apache.camel.Service; +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpEventListenerWrapper; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.http.HttpSchemes; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.util.StringUtil; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.internal.SalesforceSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +public abstract class AbstractClientBase implements SalesforceSession.SalesforceSessionListener, Service { + + protected final Logger LOG = LoggerFactory.getLogger(getClass()); + + protected static final String APPLICATION_JSON_UTF8 = "application/json;charset=utf-8"; + protected static final String APPLICATION_XML_UTF8 = "application/xml;charset=utf-8"; + + protected final HttpClient httpClient; + protected final SalesforceSession session; + protected final String version; + + protected String accessToken; + protected String instanceUrl; + + public AbstractClientBase(String version, + SalesforceSession session, HttpClient httpClient) throws SalesforceException { + + this.version = version; + this.session = session; + this.httpClient = httpClient; + } + + public void start() throws Exception { + // local cache + accessToken = session.getAccessToken(); + if (accessToken == null) { + // lazy login here! + accessToken = session.login(accessToken); + } + instanceUrl = session.getInstanceUrl(); + + // also register this client as a session listener + session.addListener(this); + } + + @Override + public void stop() throws Exception { + // deregister listener + session.removeListener(this); + } + + @Override + public void onLogin(String accessToken, String instanceUrl) { + if (!accessToken.equals(this.accessToken)) { + this.accessToken = accessToken; + this.instanceUrl = instanceUrl; + } + } + + @Override + public void onLogout() { + // ignore, if this client makes another request with stale token, + // SalesforceSecurityListener will auto login! + } + + protected SalesforceExchange getContentExchange(String method, String url) { + SalesforceExchange get = new SalesforceExchange(); + get.setMethod(method); + get.setURL(url); + get.setClient(this); + return get; + } + + protected interface ClientResponseCallback { + void onResponse(InputStream response, SalesforceException ex); + } + + protected void doHttpRequest(final ContentExchange request, final ClientResponseCallback callback) { + + // use SalesforceSecurityListener for security login retries + try { + final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme())); + request.setEventListener(new SalesforceSecurityListener( + httpClient.getDestination(request.getAddress(), isHttps), + request, session, accessToken)); + } catch (IOException e) { + // propagate exception + callback.onResponse(null, new SalesforceException( + String.format("Error registering security listener: %s", e.getMessage()), + e)); + } + + // use HttpEventListener for lifecycle events + request.setEventListener(new HttpEventListenerWrapper(request.getEventListener(), true) { + + public String reason; + + @Override + public void onConnectionFailed(Throwable ex) { + super.onConnectionFailed(ex); + callback.onResponse(null, + new SalesforceException("Connection error: " + ex.getMessage(), ex)); + } + + @Override + public void onException(Throwable ex) { + super.onException(ex); + callback.onResponse(null, + new SalesforceException("Unexpected exception: " + ex.getMessage(), ex)); + } + + @Override + public void onExpire() { + super.onExpire(); + callback.onResponse(null, + new SalesforceException("Request expired", null)); + } + + @Override + public void onResponseComplete() throws IOException { + super.onResponseComplete(); + + final int responseStatus = request.getResponseStatus(); + if (responseStatus < HttpStatus.OK_200 || responseStatus >= HttpStatus.MULTIPLE_CHOICES_300) { + final String msg = String.format("Error {%s:%s} executing {%s:%s}", + responseStatus, reason, request.getMethod(), request.getRequestURI()); + final SalesforceException exception = new SalesforceException(msg, createRestException(request)); + exception.setStatusCode(responseStatus); + callback.onResponse(null, exception); + } else { + // TODO not memory efficient for large response messages, + // doesn't seem to be possible in Jetty 7 to directly stream to response parsers + final byte[] bytes = request.getResponseContentBytes(); + callback.onResponse(bytes != null ? new ByteArrayInputStream(bytes) : null, null); + } + + } + + @Override + public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { + super.onResponseStatus(version, status, reason); + // remember status reason + this.reason = reason.toString(StringUtil.__ISO_8859_1); + } + }); + + // execute the request + try { + httpClient.send(request); + } catch (IOException e) { + String msg = "Unexpected Error: " + e.getMessage(); + // send error through callback + callback.onResponse(null, new SalesforceException(msg, e)); + } + + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public void setInstanceUrl(String instanceUrl) { + this.instanceUrl = instanceUrl; + } + + protected abstract void setAccessToken(HttpExchange httpExchange); + + protected abstract SalesforceException createRestException(ContentExchange httpExchange); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java new file mode 100644 index 0000000..b00e3fd --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.bulk.*; + +import java.io.InputStream; +import java.util.List; + +/** + * Client interface for Salesforce Bulk API + */ +public interface BulkApiClient { + + public static interface JobInfoResponseCallback { + void onResponse(JobInfo jobInfo, SalesforceException ex); + } + + public static interface BatchInfoResponseCallback { + void onResponse(BatchInfo batchInfo, SalesforceException ex); + } + + public static interface BatchInfoListResponseCallback { + void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex); + } + + public static interface StreamResponseCallback { + void onResponse(InputStream inputStream, SalesforceException ex); + } + + public static interface QueryResultIdsCallback { + void onResponse(List<String> ids, SalesforceException ex); + } + + /** + * Creates a Bulk Job + * + * @param jobInfo {@link JobInfo} with required fields + * @param callback {@link JobInfoResponseCallback} to be invoked on response or error + */ + void createJob(JobInfo jobInfo, + JobInfoResponseCallback callback); + + void getJob(String jobId, + JobInfoResponseCallback callback); + + void closeJob(String jobId, + JobInfoResponseCallback callback); + + void abortJob(String jobId, + JobInfoResponseCallback callback); + + void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum, + BatchInfoResponseCallback callback); + + void getBatch(String jobId, String batchId, + BatchInfoResponseCallback callback); + + void getAllBatches(String jobId, + BatchInfoListResponseCallback callback); + + void getRequest(String jobId, String batchId, + StreamResponseCallback callback); + + void getResults(String jobId, String batchId, + StreamResponseCallback callback); + + void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType, + BatchInfoResponseCallback callback); + + void getQueryResultIds(String jobId, String batchId, + QueryResultIdsCallback callback); + + void getQueryResult(String jobId, String batchId, String resultId, + StreamResponseCallback callback); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java new file mode 100644 index 0000000..b4899e5 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java @@ -0,0 +1,481 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.http.HttpHeaders; +import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.util.StringUtil; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.RestError; +import org.apache.camel.component.salesforce.api.dto.bulk.*; +import org.apache.camel.component.salesforce.api.dto.bulk.Error; +import org.apache.camel.component.salesforce.internal.SalesforceSession; + +import javax.xml.bind.*; +import javax.xml.transform.stream.StreamSource; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; + +public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiClient { + + private static final String TOKEN_HEADER = "X-SFDC-Session"; + + private JAXBContext context; + private static final ContentType DEFAULT_ACCEPT_TYPE = ContentType.XML; + private ObjectFactory objectFactory; + + public DefaultBulkApiClient(String version, + SalesforceSession session, HttpClient httpClient) throws SalesforceException { + super(version, session, httpClient); + + try { + context = JAXBContext.newInstance(JobInfo.class.getPackage().getName(), getClass().getClassLoader()); + } catch (JAXBException e) { + String msg = "Error loading Bulk API DTOs: " + e.getMessage(); + throw new IllegalArgumentException(msg, e); + } + + this.objectFactory = new ObjectFactory(); + } + + @Override + public void createJob(JobInfo request, final JobInfoResponseCallback callback) { + + // clear system fields if set + sanitizeJobRequest(request); + + final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(null)); + try { + marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8); + } catch (SalesforceException e) { + callback.onResponse(null, e); + return; + } + + // make the call and parse the result in callback + doHttpRequest(post, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + JobInfo value = null; + if (response != null) { + try { + value = unmarshalResponse(response, post, JobInfo.class); + } catch (SalesforceException e) { + ex = e; + } + } + callback.onResponse(value, ex); + } + }); + + } + + // reset read only fields + private void sanitizeJobRequest(JobInfo request) { + request.setApexProcessingTime(null); + request.setApiActiveProcessingTime(null); + request.setApiVersion(null); + request.setCreatedById(null); + request.setCreatedDate(null); + request.setId(null); + request.setNumberBatchesCompleted(null); + request.setNumberBatchesFailed(null); + request.setNumberBatchesInProgress(null); + request.setNumberBatchesQueued(null); + request.setNumberBatchesTotal(null); + request.setNumberRecordsFailed(null); + request.setNumberRecordsProcessed(null); + request.setNumberRetries(null); + request.setState(null); + request.setSystemModstamp(null); + request.setSystemModstamp(null); + } + + @Override + public void getJob(String jobId, final JobInfoResponseCallback callback) { + + final ContentExchange get = getContentExchange(HttpMethods.GET, jobUrl(jobId)); + + // make the call and parse the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + JobInfo value = null; + try { + value = unmarshalResponse(response, get, JobInfo.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value, ex); + } + }); + + } + + @Override + public void closeJob(String jobId, final JobInfoResponseCallback callback) { + final JobInfo request = new JobInfo(); + request.setState(JobStateEnum.CLOSED); + + final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId)); + try { + marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8); + } catch (SalesforceException e) { + callback.onResponse(null, e); + return; + } + + // make the call and parse the result + doHttpRequest(post, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + JobInfo value = null; + try { + value = unmarshalResponse(response, post, JobInfo.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value, ex); + } + }); + + } + + @Override + public void abortJob(String jobId, final JobInfoResponseCallback callback) { + final JobInfo request = new JobInfo(); + request.setState(JobStateEnum.ABORTED); + + final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId)); + try { + marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8); + } catch (SalesforceException e) { + callback.onResponse(null, e); + return; + } + + // make the call and parse the result + doHttpRequest(post, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + JobInfo value = null; + try { + value = unmarshalResponse(response, post, JobInfo.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value, ex); + } + }); + + } + + @Override + public void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum, + final BatchInfoResponseCallback callback) { + + final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null)); + post.setRequestContentSource(batchStream); + post.setRequestContentType(getContentType(contentTypeEnum) + ";charset=" + StringUtil.__UTF8); + + // make the call and parse the result + doHttpRequest(post, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + BatchInfo value = null; + try { + value = unmarshalResponse(response, post, BatchInfo.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value, ex); + } + }); + + } + + @Override + public void getBatch(String jobId, String batchId, + final BatchInfoResponseCallback callback) { + + final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId)); + + // make the call and parse the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + BatchInfo value = null; + try { + value = unmarshalResponse(response, get, BatchInfo.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value, ex); + } + }); + + } + + @Override + public void getAllBatches(String jobId, + final BatchInfoListResponseCallback callback) { + + final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, null)); + + // make the call and parse the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + BatchInfoList value = null; + try { + value = unmarshalResponse(response, get, BatchInfoList.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value != null ? value.getBatchInfo() : null, ex); + } + }); + + } + + @Override + public void getRequest(String jobId, String batchId, + final StreamResponseCallback callback) { + + final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId)); + + // make the call and parse the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + callback.onResponse(response, ex); + } + }); + + } + + @Override + public void getResults(String jobId, String batchId, + final StreamResponseCallback callback) { + final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null)); + + // make the call and return the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + callback.onResponse(response, ex); + } + }); + } + + @Override + public void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType, + final BatchInfoResponseCallback callback) { + + final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null)); + byte[] queryBytes = soqlQuery.getBytes(StringUtil.__UTF8_CHARSET); + post.setRequestContent(new ByteArrayBuffer(queryBytes)); + post.setRequestContentType(getContentType(jobContentType) + ";charset=" + StringUtil.__UTF8); + + // make the call and parse the result + doHttpRequest(post, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + BatchInfo value = null; + try { + value = unmarshalResponse(response, post, BatchInfo.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value, ex); + } + }); + + } + + @Override + public void getQueryResultIds(String jobId, String batchId, + final QueryResultIdsCallback callback) { + final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null)); + + // make the call and parse the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + QueryResultList value = null; + try { + value = unmarshalResponse(response, get, QueryResultList.class); + } catch (SalesforceException e) { + ex = e; + } + callback.onResponse(value != null ? Collections.unmodifiableList(value.getResult()) : null, + ex); + } + }); + + } + + @Override + public void getQueryResult(String jobId, String batchId, String resultId, + final StreamResponseCallback callback) { + final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, resultId)); + + // make the call and parse the result + doHttpRequest(get, new ClientResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException ex) { + callback.onResponse(response, ex); + } + }); + + } + + @Override + protected void setAccessToken(HttpExchange httpExchange) { + httpExchange.setRequestHeader(TOKEN_HEADER, accessToken); + } + + @Override + protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) { + // set access token for all requests + setAccessToken(request); + + // set default charset + request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8); + + // TODO check if this is really needed or not, since SF response content type seems fixed + // check if the default accept content type must be used + if (!request.getRequestFields().containsKey(HttpHeaders.ACCEPT)) { + final String contentType = getContentType(DEFAULT_ACCEPT_TYPE); + request.setRequestHeader(HttpHeaders.ACCEPT, contentType); + // request content type and charset is set by the request entity + } + + super.doHttpRequest(request, callback); + } + + private static String getContentType(ContentType type) { + String result = null; + + switch (type) { + case CSV: + result = "text/csv"; + break; + + case XML: + result = "application/xml"; + break; + + case ZIP_CSV: + case ZIP_XML: + result = type.toString().toLowerCase().replace('_', '/'); + break; + } + + return result; + } + + @Override + protected SalesforceException createRestException(ContentExchange request) { + // this must be of type Error + try { + final Error error = unmarshalResponse(new ByteArrayInputStream(request.getResponseContentBytes()), + request, Error.class); + + final RestError restError = new RestError(); + restError.setErrorCode(error.getExceptionCode()); + restError.setMessage(error.getExceptionMessage()); + + return new SalesforceException(Arrays.asList(restError), request.getResponseStatus()); + } catch (SalesforceException e) { + String msg = "Error un-marshaling Salesforce Error: " + e.getMessage(); + return new SalesforceException(msg, e); + } + } + + private <T> T unmarshalResponse(InputStream response, ContentExchange request, Class<T> resultClass) + throws SalesforceException { + try { + Unmarshaller unmarshaller = context.createUnmarshaller(); + JAXBElement<T> result = unmarshaller.unmarshal(new StreamSource(response), resultClass); + return result.getValue(); + } catch (JAXBException e) { + throw new SalesforceException( + String.format("Error unmarshaling response {%s:%s} : %s", + request.getMethod(), request.getRequestURI(), e.getMessage()), + e); + } catch (IllegalArgumentException e) { + throw new SalesforceException( + String.format("Error unmarshaling response for {%s:%s} : %s", + request.getMethod(), request.getRequestURI(), e.getMessage()), + e); + } + } + + private void marshalRequest(Object input, ContentExchange request, String contentType) + throws SalesforceException { + try { + Marshaller marshaller = context.createMarshaller(); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + marshaller.marshal(input, byteStream); + request.setRequestContent(new ByteArrayBuffer(byteStream.toByteArray())); + request.setRequestContentType(contentType); + } catch (JAXBException e) { + throw new SalesforceException( + String.format("Error marshaling request for {%s:%s} : %s", + request.getMethod(), request.getRequestURI(), e.getMessage()), + e); + } catch (IllegalArgumentException e) { + throw new SalesforceException( + String.format("Error marshaling request for {%s:%s} : %s", + request.getMethod(), request.getRequestURI(), e.getMessage()), + e); + } + } + + private String jobUrl(String jobId) { + if (jobId != null) { + return super.instanceUrl + "/services/async/" + version + "/job/" + jobId; + } else { + return super.instanceUrl + "/services/async/" + version + "/job"; + } + } + + private String batchUrl(String jobId, String batchId) { + if (batchId != null) { + return jobUrl(jobId) + "/batch/" + batchId; + } else { + return jobUrl(jobId) + "/batch"; + } + } + + private String batchResultUrl(String jobId, String batchId, String resultId) { + if (resultId != null) { + return batchUrl(jobId, batchId) + "/result/" + resultId; + } else { + return batchUrl(jobId, batchId) + "/result"; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java new file mode 100644 index 0000000..07f5af2 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import com.thoughtworks.xstream.XStream; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.http.HttpHeaders; +import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.util.StringUtil; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.RestError; +import org.apache.camel.component.salesforce.internal.SalesforceSession; +import org.apache.camel.component.salesforce.internal.dto.RestErrors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.List; + +public class DefaultRestClient extends AbstractClientBase implements RestClient { + + private static final String SERVICES_DATA = "/services/data/"; + private static final String TOKEN_HEADER = "Authorization"; + private static final String TOKEN_PREFIX = "Bearer "; + + private ObjectMapper objectMapper; + private XStream xStream; + protected String format; + + public DefaultRestClient(HttpClient httpClient, + String version, String format, SalesforceSession session) throws SalesforceException { + super(version, session, httpClient); + + this.format = format; + + // initialize error parsers for JSON and XML + this.objectMapper = new ObjectMapper(); + this.xStream = new XStream(); + xStream.processAnnotations(RestErrors.class); + } + + @Override + protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) { + // set standard headers for all requests + final String contentType = "json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8; + request.setRequestHeader(HttpHeaders.ACCEPT, contentType); + request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8); + // request content type and charset is set by the request entity + + super.doHttpRequest(request, callback); + } + + @Override + protected SalesforceException createRestException(ContentExchange httpExchange) { + // try parsing response according to format + try { + if ("json".equals(format)) { + List<RestError> restErrors = objectMapper.readValue( + httpExchange.getResponseContent(), new TypeReference<List<RestError>>() { + }); + return new SalesforceException(restErrors, httpExchange.getResponseStatus()); + } else { + RestErrors errors = new RestErrors(); + xStream.fromXML(httpExchange.getResponseContent(), errors); + return new SalesforceException(errors.getErrors(), httpExchange.getResponseStatus()); + } + } catch (IOException e) { + // log and ignore + String msg = "Unexpected Error parsing " + format + " error response: " + e.getMessage(); + LOG.warn(msg, e); + } catch (RuntimeException e) { + // log and ignore + String msg = "Unexpected Error parsing " + format + " error response: " + e.getMessage(); + LOG.warn(msg, e); + } + + // just report HTTP status info + return new SalesforceException("Unexpected error", httpExchange.getResponseStatus()); + } + + @Override + public void getVersions(final ResponseCallback callback) { + ContentExchange get = getContentExchange(HttpMethods.GET, servicesDataUrl()); + // does not require authorization token + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void getResources(ResponseCallback callback) { + ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl()); + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void getGlobalObjects(ResponseCallback callback) { + ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl("")); + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void getBasicInfo(String sObjectName, + ResponseCallback callback) { + ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/")); + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void getDescription(String sObjectName, + ResponseCallback callback) { + ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/describe/")); + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void getSObject(String sObjectName, String id, String[] fields, + ResponseCallback callback) { + + // parse fields if set + String params = ""; + if (fields != null && fields.length > 0) { + StringBuilder fieldsValue = new StringBuilder("?fields="); + for (int i = 0; i < fields.length; i++) { + fieldsValue.append(fields[i]); + if (i < (fields.length - 1)) { + fieldsValue.append(','); + } + } + params = fieldsValue.toString(); + } + ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/" + id + params)); + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void createSObject(String sObjectName, InputStream sObject, + ResponseCallback callback) { + // post the sObject + final ContentExchange post = getContentExchange(HttpMethods.POST, sobjectsUrl(sObjectName)); + + // authorization + setAccessToken(post); + + // input stream as entity content + post.setRequestContentSource(sObject); + post.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8); + + doHttpRequest(post, new DelegatingClientCallback(callback)); + } + + @Override + public void updateSObject(String sObjectName, String id, InputStream sObject, + ResponseCallback callback) { + final ContentExchange patch = getContentExchange("PATCH", sobjectsUrl(sObjectName + "/" + id)); + // requires authorization token + setAccessToken(patch); + + // input stream as entity content + patch.setRequestContentSource(sObject); + patch.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8); + + doHttpRequest(patch, new DelegatingClientCallback(callback)); + } + + @Override + public void deleteSObject(String sObjectName, String id, + ResponseCallback callback) { + final ContentExchange delete = getContentExchange(HttpMethods.DELETE, sobjectsUrl(sObjectName + "/" + id)); + + // requires authorization token + setAccessToken(delete); + + doHttpRequest(delete, new DelegatingClientCallback(callback)); + } + + @Override + public void getSObjectWithId(String sObjectName, String fieldName, String fieldValue, + ResponseCallback callback) { + final ContentExchange get = getContentExchange(HttpMethods.GET, + sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue)); + + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void upsertSObject(String sObjectName, String fieldName, String fieldValue, InputStream sObject, + ResponseCallback callback) { + final ContentExchange patch = getContentExchange("PATCH", + sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue)); + + // requires authorization token + setAccessToken(patch); + + // input stream as entity content + patch.setRequestContentSource(sObject); + // TODO will the encoding always be UTF-8?? + patch.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8); + + doHttpRequest(patch, new DelegatingClientCallback(callback)); + } + + @Override + public void deleteSObjectWithId(String sObjectName, String fieldName, String fieldValue, + ResponseCallback callback) { + final ContentExchange delete = getContentExchange(HttpMethods.DELETE, + sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue)); + + // requires authorization token + setAccessToken(delete); + + doHttpRequest(delete, new DelegatingClientCallback(callback)); + } + + @Override + public void getBlobField(String sObjectName, String id, String blobFieldName, ResponseCallback callback) { + final ContentExchange get = getContentExchange(HttpMethods.GET, + sobjectsUrl(sObjectName + "/" + id +"/" + blobFieldName)); + // TODO this doesn't seem to be required, the response is always the content binary stream + //get.setRequestHeader(HttpHeaders.ACCEPT_ENCODING, "base64"); + + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void query(String soqlQuery, ResponseCallback callback) { + try { + + String encodedQuery = URLEncoder.encode(soqlQuery, StringUtil.__UTF8_CHARSET.toString()); + // URLEncoder likes to use '+' for spaces + encodedQuery = encodedQuery.replace("+", "%20"); + final ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl() + "query/?q=" + encodedQuery); + + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + + } catch (UnsupportedEncodingException e) { + String msg = "Unexpected error: " + e.getMessage(); + callback.onResponse(null, new SalesforceException(msg, e)); + } + } + + @Override + public void queryMore(String nextRecordsUrl, ResponseCallback callback) { + final ContentExchange get = getContentExchange(HttpMethods.GET, instanceUrl + nextRecordsUrl); + + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + } + + @Override + public void search(String soslQuery, ResponseCallback callback) { + try { + + String encodedQuery = URLEncoder.encode(soslQuery, StringUtil.__UTF8_CHARSET.toString()); + // URLEncoder likes to use '+' for spaces + encodedQuery = encodedQuery.replace("+", "%20"); + final ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl() + "search/?q=" + encodedQuery); + + // requires authorization token + setAccessToken(get); + + doHttpRequest(get, new DelegatingClientCallback(callback)); + + } catch (UnsupportedEncodingException e) { + String msg = "Unexpected error: " + e.getMessage(); + callback.onResponse(null, new SalesforceException(msg, e)); + } + } + + private String servicesDataUrl() { + return instanceUrl + SERVICES_DATA; + } + + private String versionUrl() { + if (version == null) { + throw new IllegalArgumentException("NULL API version", new NullPointerException("version")); + } + return servicesDataUrl() + "v" + version + "/"; + } + + private String sobjectsUrl(String sObjectName) { + if (sObjectName == null) { + throw new IllegalArgumentException("Null SObject name", new NullPointerException("sObjectName")); + } + return versionUrl() + "sobjects/" + sObjectName; + } + + private String sobjectsExternalIdUrl(String sObjectName, String fieldName, String fieldValue) { + if (fieldName == null || fieldValue == null) { + throw new IllegalArgumentException("External field name and value cannot be NULL"); + } + try { + String encodedValue = URLEncoder.encode(fieldValue, StringUtil.__UTF8_CHARSET.toString()); + // URLEncoder likes to use '+' for spaces + encodedValue = encodedValue.replace("+", "%20"); + return sobjectsUrl(sObjectName + "/" + fieldName + "/" + encodedValue); + } catch (UnsupportedEncodingException e) { + String msg = "Unexpected error: " + e.getMessage(); + throw new IllegalArgumentException(msg, e); + } + } + + protected void setAccessToken(HttpExchange httpExchange) { + httpExchange.setRequestHeader(TOKEN_HEADER, TOKEN_PREFIX + accessToken); + } + + private static class DelegatingClientCallback implements ClientResponseCallback { + private final ResponseCallback callback; + + public DelegatingClientCallback(ResponseCallback callback) { + this.callback = callback; + } + + @Override + public void onResponse(InputStream response, SalesforceException ex) { + callback.onResponse(response, ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java new file mode 100644 index 0000000..30186d9 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.apache.camel.component.salesforce.api.SalesforceException; + +import java.io.InputStream; + +public interface RestClient { + + public static interface ResponseCallback { + void onResponse(InputStream response, SalesforceException exception); + } + + /** + * Lists summary information about each API version currently available, + * including the version, label, and a link to each version's root. + * + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getVersions(ResponseCallback callback); + + /** + * Lists available resources for the specified API version, including resource name and URI. + * + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getResources(ResponseCallback callback); + + /** + * Lists the available objects and their metadata for your organization's data. + * + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getGlobalObjects(ResponseCallback callback); + + /** + * Describes the individual metadata for the specified object. + * + * @param sObjectName specified object name + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getBasicInfo(String sObjectName, ResponseCallback callback); + + /** + * Completely describes the individual metadata at all levels for the specified object. + * + * @param sObjectName specified object name + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getDescription(String sObjectName, ResponseCallback callback); + + /** + * Retrieves a record for the specified object ID. + * + * @param sObjectName specified object name + * @param id object id + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getSObject(String sObjectName, String id, String[] fields, ResponseCallback callback); + + /** + * Creates a record for the specified object. + * + * @param sObjectName specified object name + * @param sObject request entity + * @param callback {@link ResponseCallback} to handle response or exception + */ + void createSObject(String sObjectName, InputStream sObject, ResponseCallback callback); + + /** + * Updates a record for the specified object ID. + * + * @param sObjectName specified object name + * @param id object id + * @param sObject request entity + * @param callback {@link ResponseCallback} to handle response or exception + */ + void updateSObject(String sObjectName, String id, InputStream sObject, ResponseCallback callback); + + /** + * Deletes a record for the specified object ID. + * + * @param sObjectName specified object name + * @param id object id + * @param callback {@link ResponseCallback} to handle response or exception + */ + void deleteSObject(String sObjectName, String id, ResponseCallback callback); + + /** + * Retrieves a record for the specified external ID. + * + * @param sObjectName specified object name + * @param fieldName external field name + * @param fieldValue external field value + * @param callback {@link ResponseCallback} to handle response or exception + */ + void getSObjectWithId(String sObjectName, String fieldName, String fieldValue, ResponseCallback callback); + + /** + * Creates or updates a record based on the value of a specified external ID field. + * + * @param sObjectName specified object name + * @param fieldName external field name + * @param fieldValue external field value + * @param sObject input object to insert or update + * @param callback {@link ResponseCallback} to handle response or exception + */ + void upsertSObject(String sObjectName, + String fieldName, String fieldValue, InputStream sObject, ResponseCallback callback); + + /** + * Deletes a record based on the value of a specified external ID field. + * + * @param sObjectName specified object name + * @param fieldName external field name + * @param fieldValue external field value + * @param callback {@link ResponseCallback} to handle response or exception + */ + void deleteSObjectWithId(String sObjectName, + String fieldName, String fieldValue, ResponseCallback callback); + + + /** + * Retrieves the specified blob field from an individual record. + * + */ + void getBlobField(String sObjectName, String id, String blobFieldName, ResponseCallback callback); + +/* + TODO + SObject User Password + /vXX.X/sobjects/User/user id/password + /vXX.X/sobjects/SelfServiceUser/self service user id/password + + These methods set, reset, or get information about a user password. +*/ + + /** + * Executes the specified SOQL query. + * + * @param soqlQuery SOQL query + * @param callback {@link ResponseCallback} to handle response or exception + */ + void query(String soqlQuery, ResponseCallback callback); + + /** + * Get SOQL query results using nextRecordsUrl. + * + * @param nextRecordsUrl URL for next records to fetch, returned by query() + * @param callback {@link ResponseCallback} to handle response or exception + */ + void queryMore(String nextRecordsUrl, ResponseCallback callback); + + /** + * Executes the specified SOSL search. + * + * @param soslQuery SOSL query + * @param callback {@link ResponseCallback} to handle response or exception + */ + void search(String soslQuery, ResponseCallback callback); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java new file mode 100644 index 0000000..b17c5e1 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.eclipse.jetty.client.ContentExchange; + +/** + * Wraps a Salesforce Http Exchange + */ +public class SalesforceExchange extends ContentExchange { + + private AbstractClientBase client; + + public AbstractClientBase getClient() { + return client; + } + + public void setClient(AbstractClientBase client) { + this.client = client; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java new file mode 100644 index 0000000..5eec212 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.eclipse.jetty.client.HttpDestination; +import org.eclipse.jetty.client.HttpEventListenerWrapper; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.http.HttpHeaders; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.Buffer; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.internal.SalesforceSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class SalesforceSecurityListener extends HttpEventListenerWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(SalesforceSecurityListener.class); + + private final HttpDestination destination; + private final HttpExchange exchange; + private final SalesforceSession session; + + private String currentToken; + private int retries; + private boolean retrying; + private boolean requestComplete; + private boolean responseComplete; + + public SalesforceSecurityListener(HttpDestination destination, HttpExchange exchange, + SalesforceSession session, String accessToken) { + super(exchange.getEventListener(), true); + this.destination = destination; + this.exchange = exchange; + this.session = session; + this.currentToken = accessToken; + } + + @Override + public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { + if (status == HttpStatus.UNAUTHORIZED_401 && + retries < destination.getHttpClient().maxRetries()) { + + LOG.warn("Retrying on Salesforce authentication error [{}]: [{}]", status, reason); + setDelegatingRequests(false); + setDelegatingResponses(false); + + retrying = true; + } + super.onResponseStatus(version, status, reason); + } + + @Override + public void onRequestComplete() throws IOException { + requestComplete = true; + if (checkExchangeComplete()) { + super.onRequestComplete(); + } + } + + @Override + public void onResponseComplete() throws IOException { + responseComplete = true; + if (checkExchangeComplete()) { + super.onResponseComplete(); + } + } + + private boolean checkExchangeComplete() throws IOException { + if (retrying && requestComplete && responseComplete) { + LOG.debug("Authentication Error, retrying: {}", exchange); + + requestComplete = false; + responseComplete = false; + + setDelegatingRequests(true); + setDelegatingResponses(true); + + try { + // get a new token and retry + currentToken = session.login(currentToken); + + if (exchange instanceof SalesforceExchange) { + final SalesforceExchange salesforceExchange = (SalesforceExchange) exchange; + final AbstractClientBase client = salesforceExchange.getClient(); + + // update client cache for this and future requests + client.setAccessToken(currentToken); + client.setInstanceUrl(session.getInstanceUrl()); + client.setAccessToken(exchange); + } else { + exchange.addRequestHeader(HttpHeaders.AUTHORIZATION, + "OAuth " + currentToken); + } + + // TODO handle a change in Salesforce instanceUrl, right now we retry with the same destination + destination.resend(exchange); + + // resending, exchange is not done + return false; + + } catch (SalesforceException e) { + // logging here, since login exception is not propagated! + LOG.error(e.getMessage(), e); + + // the HTTP status and reason is pushed up + setDelegationResult(false); + } + } + + return true; + } + + @Override + public void onRetry() { + // ignore retries from other interceptors + if (retrying) { + retrying = false; + retries++; + + setDelegatingRequests(true); + setDelegatingResponses(true); + + requestComplete = false; + responseComplete = false; + } + super.onRetry(); + } + + @Override + public void onConnectionFailed(Throwable ex) { + setDelegatingRequests(true); + setDelegatingResponses(true); + // delegate connection failures + super.onConnectionFailed(ex); + } + + @Override + public void onException(Throwable ex) { + setDelegatingRequests(true); + setDelegatingResponses(true); + // delegate exceptions + super.onException(ex); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java new file mode 100644 index 0000000..0f567e6 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.client; + +import org.apache.camel.component.salesforce.api.SalesforceException; + +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Thin wrapper to handle callbacks for {@link RestClient.ResponseCallback} and allow waiting for results + */ +public class SyncResponseCallback implements RestClient.ResponseCallback { + + private InputStream response; + private SalesforceException exception; + private CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onResponse(InputStream response, SalesforceException exception) { + this.response = response; + this.exception = exception; + latch.countDown(); + } + + public void reset() { + latch = new CountDownLatch(1); + } + + public boolean await(long duration, TimeUnit unit) throws InterruptedException { + return latch.await(duration, unit); + } + + public InputStream getResponse() { + return response; + } + + public SalesforceException getException() { + return exception; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java new file mode 100644 index 0000000..cec3ef8 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * DTO for Salesforce login error + */ +public class LoginError { + + private String error; + + private String errorDescription; + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + @JsonProperty("error_description") + public String getErrorDescription() { + return errorDescription; + } + + @JsonProperty("error_description") + public void setErrorDescription(String errorDescription) { + this.errorDescription = errorDescription; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java new file mode 100644 index 0000000..c23338e --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * DTO for Salesforce login + */ +public class LoginToken { + + private String accessToken; + + private String instanceUrl; + + private String id; + + private String signature; + + private String issuedAt; + + @JsonProperty("access_token") + public String getAccessToken() { + return accessToken; + } + + @JsonProperty("access_token") + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + @JsonProperty("instance_url") + public String getInstanceUrl() { + return instanceUrl; + } + + @JsonProperty("instance_url") + public void setInstanceUrl(String instanceUrl) { + this.instanceUrl = instanceUrl; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getSignature() { + return signature; + } + + public void setSignature(String signature) { + this.signature = signature; + } + + @JsonProperty("issued_at") + public String getIssuedAt() { + return issuedAt; + } + + @JsonProperty("issued_at") + public void setIssuedAt(String issuedAt) { + this.issuedAt = issuedAt; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java new file mode 100644 index 0000000..970b9aa --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonValue; + +/** + * Salesforce Enumeration DTO for picklist NotifyForFields + */ +public enum NotifyForFieldsEnum { + + SELECT("Select"), + WHERE("Where"), + REFERENCED("Referenced"), + ALL("All"); + + final String value; + + private NotifyForFieldsEnum(String value) { + this.value = value; + } + + @JsonValue + public String value() { + return this.value; + } + + @JsonCreator + public static NotifyForFieldsEnum fromValue(String value) { + for (NotifyForFieldsEnum e : NotifyForFieldsEnum.values()) { + if (e.value.equals(value)) { + return e; + } + } + throw new IllegalArgumentException(value); + } + +} \ No newline at end of file