http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java new file mode 100644 index 0000000..807fef5 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonValue; + +/** + * Salesforce Enumeration DTO for picklist NotifyForOperations + */ +public enum NotifyForOperationsEnum { + + CREATE("Create"), + UPDATE("Update"), + ALL("All"); + + final String value; + + private NotifyForOperationsEnum(String value) { + this.value = value; + } + + @JsonValue + public String value() { + return this.value; + } + + @JsonCreator + public static NotifyForOperationsEnum forValue(String value) { + for (NotifyForOperationsEnum e : NotifyForOperationsEnum.values()) { + if (e.value.equals(value)) { + return e; + } + } + throw new IllegalArgumentException(value); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java new file mode 100644 index 0000000..2135a16 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import com.thoughtworks.xstream.annotations.XStreamAlias; +import com.thoughtworks.xstream.annotations.XStreamConverter; +import org.codehaus.jackson.annotate.JsonProperty; +import org.apache.camel.component.salesforce.api.PicklistEnumConverter; +import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase; + +/** + * Salesforce DTO for SObject PushTopic + */ +@XStreamAlias("PushTopic") +public class PushTopic extends AbstractSObjectBase { + + private String Query; + private Double ApiVersion; + private Boolean IsActive; + @XStreamConverter(PicklistEnumConverter.class) + private NotifyForFieldsEnum NotifyForFields; + @XStreamConverter(PicklistEnumConverter.class) + private NotifyForOperationsEnum NotifyForOperations; + private String Description; + + @JsonProperty("Query") + public String getQuery() { + return this.Query; + } + + @JsonProperty("Query") + public void setQuery(String Query) { + this.Query = Query; + } + + @JsonProperty("ApiVersion") + public Double getApiVersion() { + return this.ApiVersion; + } + + @JsonProperty("ApiVersion") + public void setApiVersion(Double ApiVersion) { + this.ApiVersion = ApiVersion; + } + + @JsonProperty("IsActive") + public Boolean getIsActive() { + return this.IsActive; + } + + @JsonProperty("IsActive") + public void setIsActive(Boolean IsActive) { + this.IsActive = IsActive; + } + + @JsonProperty("NotifyForFields") + public NotifyForFieldsEnum getNotifyForFields() { + return this.NotifyForFields; + } + + @JsonProperty("NotifyForFields") + public void setNotifyForFields(NotifyForFieldsEnum NotifyForFields) { + this.NotifyForFields = NotifyForFields; + } + + @JsonProperty("NotifyForOperations") + public NotifyForOperationsEnum getNotifyForOperations() { + return this.NotifyForOperations; + } + + @JsonProperty("NotifyForOperations") + public void setNotifyForOperations(NotifyForOperationsEnum NotifyForOperations) { + this.NotifyForOperations = NotifyForOperations; + } + + @JsonProperty("Description") + public String getDescription() { + return this.Description; + } + + @JsonProperty("Description") + public void setDescription(String Description) { + this.Description = Description; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java new file mode 100644 index 0000000..4adc13c --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import com.thoughtworks.xstream.annotations.XStreamImplicit; +import org.apache.camel.component.salesforce.api.dto.AbstractQueryRecordsBase; + +import java.util.List; + +/** + * Salesforce Query Records DTO for PushTopic + */ +public class QueryRecordsPushTopic extends AbstractQueryRecordsBase { + @XStreamImplicit + private List<PushTopic> records; + + public List<PushTopic> getRecords() { + return records; + } + + public void setRecords(List<PushTopic> records) { + this.records = records; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java new file mode 100644 index 0000000..caf59fe --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.dto; + +import com.thoughtworks.xstream.annotations.XStreamAlias; +import com.thoughtworks.xstream.annotations.XStreamImplicit; +import org.apache.camel.component.salesforce.api.dto.RestError; + +import java.util.List; + +/** + * DTO for Salesforce REST errors + */ +@XStreamAlias("Errors") +public class RestErrors { + + @XStreamImplicit(itemFieldName = "Error") + private List<RestError> errors; + + public List<RestError> getErrors() { + return errors; + } + + public void setErrors(List<RestError> errors) { + this.errors = errors; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java new file mode 100644 index 0000000..1e40d18 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.util.ServiceHelper; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase; +import org.apache.camel.component.salesforce.internal.PayloadFormat; +import org.apache.camel.component.salesforce.internal.client.DefaultRestClient; +import org.apache.camel.component.salesforce.internal.client.RestClient; + +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.*; + +public abstract class AbstractRestProcessor extends AbstractSalesforceProcessor { + + protected static final String RESPONSE_CLASS = AbstractRestProcessor.class.getName() + ".responseClass"; + + private RestClient restClient; + private Map<String, Class<?>> classMap; + + public AbstractRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException { + super(endpoint); + + final PayloadFormat payloadFormat = endpoint.getConfiguration().getPayloadFormat(); + + this.restClient = new DefaultRestClient(httpClient, endpointConfigMap.get(API_VERSION), + payloadFormat.toString().toLowerCase() , session); + + this.classMap = endpoint.getComponent().getClassMap(); + } + + @Override + public void start() throws Exception { + ServiceHelper.startService(restClient); + } + + @Override + public void stop() throws Exception { + ServiceHelper.stopService(restClient); + } + + @Override + public final boolean process(final Exchange exchange, final AsyncCallback callback) { + + // pre-process request message + try { + processRequest(exchange); + } catch (SalesforceException e) { + exchange.setException(e); + callback.done(true); + return true; + } catch (RuntimeException e) { + exchange.setException(new SalesforceException(e.getMessage(), e)); + callback.done(true); + return true; + } + + // call Salesforce asynchronously + try { + + // call Operation using REST client + switch (operationName) { + case GET_VERSIONS: + restClient.getVersions(new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + // process response entity and create out message + processResponse(exchange, response, exception, callback); + } + }); + break; + + case GET_RESOURCES: + restClient.getResources(new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + case GET_GLOBAL_OBJECTS: + restClient.getGlobalObjects(new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + case GET_BASIC_INFO: + String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL); + restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + case GET_DESCRIPTION: + sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL); + restClient.getDescription(sObjectName, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + case GET_SOBJECT: + { + String sObjectIdValue; + // determine parameters from input AbstractSObject + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + sObjectIdValue = sObjectBase.getId(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + final String sObjectId = sObjectIdValue; + + // use sObject name to load class + setResponseClass(exchange, sObjectName); + + // get optional field list + String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL); + String[] fields = null; + if (fieldsValue != null) { + fields = fieldsValue.split(","); + } + + restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, sObjectId, null, null); + } + }); + + break; + } + + case CREATE_SOBJECT: + { + // determine parameters from input AbstractSObject + AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + } + + restClient.createSObject(sObjectName, getRequestStream(exchange), + new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + + break; + } + + case UPDATE_SOBJECT: + { + // determine parameters from input AbstractSObject + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + String sObjectId; + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + // remember the sObject Id + sObjectId = sObjectBase.getId(); + // clear base object fields, which cannot be updated + sObjectBase.clearBaseFields(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + } + + final String finalsObjectId = sObjectId; + restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange), + new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, finalsObjectId, null, null); + } + }); + + break; + } + + case DELETE_SOBJECT: + { + // determine parameters from input AbstractSObject + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + String sObjectIdValue; + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + sObjectIdValue = sObjectBase.getId(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + final String sObjectId = sObjectIdValue; + + restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, sObjectId, null, null); + } + }); + break; + } + + case GET_SOBJECT_WITH_ID: + { + Object oldValue = null; + String sObjectExtIdValue; + final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, + exchange, IGNORE_BODY, NOT_OPTIONAL); + + // determine parameters from input AbstractSObject + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName); + sObjectExtIdValue = oldValue.toString(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL); + } + + // use sObject name to load class + setResponseClass(exchange, sObjectName); + + final Object finalOldValue = oldValue; + restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue, + new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue); + } + }); + + break; + } + + case UPSERT_SOBJECT: + { + String sObjectExtIdValue; + final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, + IGNORE_BODY, NOT_OPTIONAL); + + // determine parameters from input AbstractSObject + Object oldValue = null; + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName); + sObjectExtIdValue = oldValue.toString(); + // clear base object fields, which cannot be updated + sObjectBase.clearBaseFields(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL); + } + + final Object finalOldValue = oldValue; + restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue, + getRequestStream(exchange), new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue); + } + }); + + break; + } + + case DELETE_SOBJECT_WITH_ID: + { + final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + + // determine parameters from input AbstractSObject + Object oldValue = null; + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + String sObjectExtIdValue; + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName); + sObjectExtIdValue = oldValue.toString(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL); + } + + final Object finalOldValue = oldValue; + restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue, + new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue); + } + }); + + break; + } + + case GET_BLOB_FIELD: + { + // get blob field name + final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME, + exchange, IGNORE_BODY, NOT_OPTIONAL); + + // determine parameters from input AbstractSObject + final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); + String sObjectIdValue; + if (sObjectBase != null) { + sObjectName = sObjectBase.getClass().getSimpleName(); + sObjectIdValue = sObjectBase.getId(); + } else { + sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); + sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + final String sObjectId = sObjectIdValue; + + restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName, + new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + restoreFields(exchange, sObjectBase, sObjectId, null, null); + } + }); + break; + } + + case QUERY: + final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL); + + // use sObject name to load class + setResponseClass(exchange, null); + + restClient.query(sObjectQuery, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + case QUERY_MORE: + // reuse SOBJECT_QUERY parameter name for nextRecordsUrl + final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL); + + // use custom response class property + setResponseClass(exchange, null); + + restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + case SEARCH: + final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL); + + restClient.search(sObjectSearch, new RestClient.ResponseCallback() { + @Override + public void onResponse(InputStream response, SalesforceException exception) { + processResponse(exchange, response, exception, callback); + } + }); + break; + + } + + } catch (SalesforceException e) { + exchange.setException(new SalesforceException( + String.format("Error processing %s: [%s] \"%s\"", + operationName, e.getStatusCode(), e.getMessage()), + e)); + callback.done(true); + return true; + } catch (RuntimeException e) { + exchange.setException(new SalesforceException( + String.format("Unexpected Error processing %s: \"%s\"", + operationName, e.getMessage()), + e)); + callback.done(true); + return true; + } + + // continue routing asynchronously + return false; + } + + private void restoreFields(Exchange exchange, AbstractSObjectBase sObjectBase, + String sObjectId, String sObjectExtIdName, Object oldValue) { + // restore fields + if (sObjectBase != null) { + // restore the Id if it was cleared + if (sObjectId != null) { + sObjectBase.setId(sObjectId); + } + // restore the external id if it was cleared + if (sObjectExtIdName != null && oldValue != null) { + try { + setPropertyValue(sObjectBase, sObjectExtIdName, oldValue); + } catch (SalesforceException e) { + // YES, the exchange may fail if the property cannot be reset!!! + exchange.setException(e); + } + } + } + } + + private void setPropertyValue(AbstractSObjectBase sObjectBase, String name, Object value) throws SalesforceException { + try { + // set the value with the set method + Method setMethod = sObjectBase.getClass().getMethod("set" + name, value.getClass()); + setMethod.invoke(sObjectBase, value); + } catch (NoSuchMethodException e) { + throw new SalesforceException( + String.format("SObject %s does not have a field %s", + sObjectBase.getClass().getName(), name), + e); + } catch (InvocationTargetException e) { + throw new SalesforceException( + String.format("Error setting value %s.%s", + sObjectBase.getClass().getSimpleName(), name), + e); + } catch (IllegalAccessException e) { + throw new SalesforceException( + String.format("Error accessing value %s.%s", + sObjectBase.getClass().getSimpleName(), name), + e); + } + } + + private Object getAndClearPropertyValue(AbstractSObjectBase sObjectBase, String propertyName) throws SalesforceException { + try { + // obtain the value using the get method + Method getMethod = sObjectBase.getClass().getMethod("get" + propertyName); + Object value = getMethod.invoke(sObjectBase); + + // clear the value with the set method + Method setMethod = sObjectBase.getClass().getMethod("set" + propertyName, getMethod.getReturnType()); + setMethod.invoke(sObjectBase, new Object[] { null }); + + return value; + } catch (NoSuchMethodException e) { + throw new SalesforceException( + String.format("SObject %s does not have a field %s", + sObjectBase.getClass().getSimpleName(), propertyName), + e); + } catch (InvocationTargetException e) { + throw new SalesforceException( + String.format("Error getting/setting value %s.%s", + sObjectBase.getClass().getSimpleName(), propertyName), + e); + } catch (IllegalAccessException e) { + throw new SalesforceException( + String.format("Error accessing value %s.%s", + sObjectBase.getClass().getSimpleName(), propertyName), + e); + } + } + + // pre-process request message + protected abstract void processRequest(Exchange exchange) throws SalesforceException; + + // get request stream from In message + protected abstract InputStream getRequestStream(Exchange exchange) throws SalesforceException; + + private void setResponseClass(Exchange exchange, String sObjectName) throws SalesforceException { + Class<?> sObjectClass; + + if (sObjectName != null) { + // lookup class from class map + sObjectClass = classMap.get(sObjectName); + if (null == sObjectClass) { + throw new SalesforceException(String.format("No class found for SObject %s", sObjectName), null); + } + + } else { + + // use custom response class property + final String className = getParameter(SOBJECT_CLASS, exchange, IGNORE_BODY, NOT_OPTIONAL); + try { + sObjectClass = endpoint.getComponent().getCamelContext() + .getClassResolver().resolveMandatoryClass(className); + } catch (ClassNotFoundException e) { + throw new SalesforceException( + String.format("SObject class not found %s, %s", + className, e.getMessage()), + e); + } + } + exchange.setProperty(RESPONSE_CLASS, sObjectClass); + } + + // process response entity and set out message in exchange + protected abstract void processResponse(Exchange exchange, InputStream responseEntity, SalesforceException ex, AsyncCallback callback); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java new file mode 100644 index 0000000..e784458 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.eclipse.jetty.client.HttpClient; +import org.apache.camel.component.salesforce.SalesforceComponent; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.internal.OperationName; +import org.apache.camel.component.salesforce.internal.SalesforceSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public abstract class AbstractSalesforceProcessor implements SalesforceProcessor { + + protected static final boolean NOT_OPTIONAL = false; + protected static final boolean IS_OPTIONAL = true; + protected static final boolean USE_BODY = true; + protected static final boolean IGNORE_BODY = false; + protected final Logger LOG = LoggerFactory.getLogger(this.getClass()); + + protected final SalesforceEndpoint endpoint; + protected final Map<String, String> endpointConfigMap; + + protected final OperationName operationName; + protected final SalesforceSession session; + protected final HttpClient httpClient; + + public AbstractSalesforceProcessor(SalesforceEndpoint endpoint) { + this.endpoint = endpoint; + this.operationName = endpoint.getOperationName(); + this.endpointConfigMap = endpoint.getConfiguration().toValueMap(); + + final SalesforceComponent component = endpoint.getComponent(); + this.session = component.getSession(); + this.httpClient = endpoint.getConfiguration().getHttpClient(); + } + + @Override + public abstract boolean process(Exchange exchange, AsyncCallback callback); + + /** + * Gets value for a parameter from header, endpoint config, or exchange body (optional). + * + * @param exchange exchange to inspect + * @param convertInBody converts In body to String value if true + * @param propName name of property + * @param optional if {@code true} returns null, otherwise throws RestException + * @return value of property, or {@code null} for optional parameters if not found. + * @throws org.apache.camel.component.salesforce.api.SalesforceException if the property can't be found. + */ + protected final String getParameter(String propName, Exchange exchange, boolean convertInBody, boolean optional) throws SalesforceException { + String propValue = exchange.getIn().getHeader(propName, String.class); + propValue = propValue == null ? endpointConfigMap.get(propName) : propValue; + propValue = (propValue == null && convertInBody) ? exchange.getIn().getBody(String.class) : propValue; + + // error if property was not set + if (propValue == null && !optional) { + String msg = "Missing property " + propName; + throw new SalesforceException(msg, null); + } + + return propValue; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java new file mode 100644 index 0000000..6e070f7 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.processor; + +import org.apache.camel.*; +import org.apache.camel.converter.stream.StreamCacheConverter; +import org.apache.camel.util.ServiceHelper; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.SalesforceEndpointConfig; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.bulk.*; +import org.apache.camel.component.salesforce.internal.client.BulkApiClient; +import org.apache.camel.component.salesforce.internal.client.DefaultBulkApiClient; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.*; + +public class BulkApiProcessor extends AbstractSalesforceProcessor { + + private BulkApiClient bulkClient; + + public BulkApiProcessor(SalesforceEndpoint endpoint) throws SalesforceException { + super(endpoint); + + this.bulkClient = new DefaultBulkApiClient( + endpointConfigMap.get(SalesforceEndpointConfig.API_VERSION), session, httpClient); + } + + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + + boolean done = false; + try { + switch (operationName) { + case CREATE_JOB: + JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class); + bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() { + @Override + public void onResponse(JobInfo jobInfo, SalesforceException ex) { + processResponse(exchange, jobInfo, ex, callback); + } + }); + + break; + + case GET_JOB: + jobBody = exchange.getIn().getBody(JobInfo.class); + String jobId; + if (jobBody != null) { + jobId = jobBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() { + @Override + public void onResponse(JobInfo jobInfo, SalesforceException ex) { + processResponse(exchange, jobInfo, ex, callback); + } + }); + + break; + + case CLOSE_JOB: + jobBody = exchange.getIn().getBody(JobInfo.class); + if (jobBody != null) { + jobId = jobBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() { + @Override + public void onResponse(JobInfo jobInfo, SalesforceException ex) { + processResponse(exchange, jobInfo, ex, callback); + } + }); + + break; + + case ABORT_JOB: + jobBody = exchange.getIn().getBody(JobInfo.class); + if (jobBody != null) { + jobId = jobBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() { + @Override + public void onResponse(JobInfo jobInfo, SalesforceException ex) { + processResponse(exchange, jobInfo, ex, callback); + } + }); + + break; + + case CREATE_BATCH: + // since request is in the body, use headers or endpoint params + ContentType contentType = ContentType.fromValue( + getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL)); + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + + InputStream request; + try { + request = exchange.getIn().getMandatoryBody(InputStream.class); + } catch (CamelException e) { + String msg = "Error preparing batch request: " + e.getMessage(); + throw new SalesforceException(msg, e); + } + + bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() { + @Override + public void onResponse(BatchInfo batchInfo, SalesforceException ex) { + processResponse(exchange, batchInfo, ex, callback); + } + }); + + break; + + case GET_BATCH: + BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class); + String batchId; + if (batchBody != null) { + jobId = batchBody.getJobId(); + batchId = batchBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() { + @Override + public void onResponse(BatchInfo batchInfo, SalesforceException ex) { + processResponse(exchange, batchInfo, ex, callback); + } + }); + + break; + + case GET_ALL_BATCHES: + jobBody = exchange.getIn().getBody(JobInfo.class); + if (jobBody != null) { + jobId = jobBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() { + @Override + public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) { + processResponse(exchange, batchInfoList, ex, callback); + } + }); + + break; + + case GET_REQUEST: + batchBody = exchange.getIn().getBody(BatchInfo.class); + if (batchBody != null) { + jobId = batchBody.getJobId(); + batchId = batchBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + + bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() { + @Override + public void onResponse(InputStream inputStream, SalesforceException ex) { + // read the request stream into a StreamCache temp file + // ensures the connection is read + StreamCache body = null; + if (inputStream != null) { + try { + body = StreamCacheConverter.convertToStreamCache(inputStream, exchange); + } catch (IOException e) { + String msg = "Error retrieving batch request: " + e.getMessage(); + ex = new SalesforceException(msg, e); + } finally { + // close the input stream to release the Http connection + try { + inputStream.close(); + } catch (IOException ignore) { + } + } + } + processResponse(exchange, body, ex, callback); + } + }); + + break; + + case GET_RESULTS: + batchBody = exchange.getIn().getBody(BatchInfo.class); + if (batchBody != null) { + jobId = batchBody.getJobId(); + batchId = batchBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() { + @Override + public void onResponse(InputStream inputStream, SalesforceException ex) { + // read the result stream into a StreamCache temp file + // ensures the connection is read + StreamCache body = null; + if (inputStream != null) { + try { + body = StreamCacheConverter.convertToStreamCache(inputStream, exchange); + } catch (IOException e) { + String msg = "Error retrieving batch results: " + e.getMessage(); + ex = new SalesforceException(msg, e); + } finally { + // close the input stream to release the Http connection + try { + inputStream.close(); + } catch (IOException ignore) { + } + } + } + processResponse(exchange, body, ex, callback); + } + }); + + break; + + case CREATE_BATCH_QUERY: + jobBody = exchange.getIn().getBody(JobInfo.class); + String soqlQuery; + if (jobBody != null) { + jobId = jobBody.getId(); + contentType = jobBody.getContentType(); + // use SOQL query from header or endpoint config + soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL); + } else { + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + contentType = ContentType.fromValue( + getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL)); + // reuse SOBJECT_QUERY property + soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.createBatchQuery(jobId, soqlQuery, contentType, + new BulkApiClient.BatchInfoResponseCallback() { + @Override + public void onResponse(BatchInfo batchInfo, SalesforceException ex) { + processResponse(exchange, batchInfo, ex, callback); + } + }); + + break; + + case GET_QUERY_RESULT_IDS: + batchBody = exchange.getIn().getBody(BatchInfo.class); + if (batchBody != null) { + jobId = batchBody.getJobId(); + batchId = batchBody.getId(); + } else { + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() { + @Override + public void onResponse(List<String> ids, SalesforceException ex) { + processResponse(exchange, ids, ex, callback); + } + }); + + break; + + case GET_QUERY_RESULT: + batchBody = exchange.getIn().getBody(BatchInfo.class); + String resultId; + if (batchBody != null) { + jobId = batchBody.getJobId(); + batchId = batchBody.getId(); + resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + } else { + jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); + resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL); + } + bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() { + @Override + public void onResponse(InputStream inputStream, SalesforceException ex) { + StreamCache body = null; + if (inputStream != null) { + // read the result stream into a StreamCache temp file + // ensures the connection is read + try { + body = StreamCacheConverter.convertToStreamCache(inputStream, exchange); + } catch (IOException e) { + String msg = "Error retrieving query result: " + e.getMessage(); + ex = new SalesforceException(msg, e); + } finally { + // close the input stream to release the Http connection + try { + inputStream.close(); + } catch (IOException e) { + // ignore + } + } + } + processResponse(exchange, body, ex, callback); + } + }); + + break; + } + + } catch (SalesforceException e) { + exchange.setException(new SalesforceException( + String.format("Error processing %s: [%s] \"%s\"", + operationName, e.getStatusCode(), e.getMessage()), + e)); + callback.done(true); + done = true; + } catch (InvalidPayloadException e) { + exchange.setException(new SalesforceException( + String.format("Unexpected Error processing %s: \"%s\"", + operationName, e.getMessage()), + e)); + callback.done(true); + done = true; + } catch (RuntimeException e) { + exchange.setException(new SalesforceException( + String.format("Unexpected Error processing %s: \"%s\"", + operationName, e.getMessage()), + e)); + callback.done(true); + done = true; + } + + // continue routing asynchronously if false + return done; + } + + private void processResponse(Exchange exchange, Object body, SalesforceException ex, AsyncCallback callback) { + final Message out = exchange.getOut(); + if (ex != null) { + exchange.setException(ex); + } else { + out.setBody(body); + } + + // copy headers and attachments + out.getHeaders().putAll(exchange.getIn().getHeaders()); + out.getAttachments().putAll(exchange.getIn().getAttachments()); + + // signal exchange completion + callback.done(false); + } + + @Override + public void start() throws Exception { + ServiceHelper.startService(bulkClient); + } + + @Override + public void stop() throws Exception { + // stop the client + ServiceHelper.stopService(bulkClient); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java new file mode 100644 index 0000000..3ac32c6 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.type.TypeReference; +import org.eclipse.jetty.util.StringUtil; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +public class JsonRestProcessor extends AbstractRestProcessor { + + // it is ok to use a single thread safe ObjectMapper + private final ObjectMapper objectMapper; + private static final String RESPONSE_TYPE = JsonRestProcessor.class.getName() + ".responseType"; + + public JsonRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException { + super(endpoint); + + this.objectMapper = new ObjectMapper(); + // enable date time support including Joda DateTime + this.objectMapper.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, false); + } + + @Override + protected void processRequest(Exchange exchange) { + + switch (operationName) { + case GET_VERSIONS: + // handle in built response types + exchange.setProperty(RESPONSE_TYPE, new TypeReference<List<Version>>() {}); + break; + + case GET_RESOURCES: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, RestResources.class); + break; + + case GET_GLOBAL_OBJECTS: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, GlobalObjects.class); + break; + + case GET_BASIC_INFO: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, SObjectBasicInfo.class); + break; + + case GET_DESCRIPTION: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, SObjectDescription.class); + break; + + case CREATE_SOBJECT: + // handle known response type + exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class); + break; + + case UPSERT_SOBJECT: + // handle known response type + exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class); + break; + + case SEARCH: + // handle known response type + exchange.setProperty(RESPONSE_TYPE, new TypeReference<List<SearchResult>>() {}); + break; + + } + } + + @Override + protected InputStream getRequestStream(Exchange exchange) throws SalesforceException { + try { + InputStream request; + Message in = exchange.getIn(); + request = in.getBody(InputStream.class); + if (request == null) { + AbstractSObjectBase sObject = in.getBody(AbstractSObjectBase.class); + if (sObject != null) { + // marshall the SObject + ByteArrayOutputStream out = new ByteArrayOutputStream(); + objectMapper.writeValue(out, sObject); + request = new ByteArrayInputStream(out.toByteArray()); + } else { + // if all else fails, get body as String + final String body = in.getBody(String.class); + if (null == body) { + String msg = "Unsupported request message body " + + (in.getBody() == null ? null : in.getBody().getClass()); + throw new SalesforceException(msg, null); + } else { + request = new ByteArrayInputStream(body.getBytes(StringUtil.__UTF8_CHARSET)); + } + } + } + + return request; + + } catch (IOException e) { + String msg = "Error marshaling request: " + e.getMessage(); + throw new SalesforceException(msg, e); + } + } + + @Override + protected void processResponse(Exchange exchange, InputStream responseEntity, SalesforceException ex, AsyncCallback callback) { + + // process JSON response for TypeReference + try { + // do we need to un-marshal a response + if (responseEntity != null) { + Object response = null; + Class<?> responseClass = exchange.getProperty(RESPONSE_CLASS, Class.class); + if (responseClass != null) { + response = objectMapper.readValue(responseEntity, responseClass); + } else { + TypeReference<?> responseType = exchange.getProperty(RESPONSE_TYPE, TypeReference.class); + if (responseType != null) { + response = objectMapper.readValue(responseEntity, responseType); + } else { + // return the response as a stream, for getBlobField + response = responseEntity; + } + } + exchange.getOut().setBody(response); + } else { + exchange.setException(ex); + } + // copy headers and attachments + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments()); + } catch (IOException e) { + String msg = "Error parsing JSON response: " + e.getMessage(); + exchange.setException(new SalesforceException(msg, e)); + } finally { + // cleanup temporary exchange headers + exchange.removeProperty(RESPONSE_CLASS); + exchange.removeProperty(RESPONSE_TYPE); + + // consume response entity + try { + if (responseEntity != null) { + responseEntity.close(); + } + } catch (IOException ignored) { + } + + // notify callback that exchange is done + callback.done(false); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java new file mode 100644 index 0000000..36f77f6 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Service; + +public interface SalesforceProcessor extends Service { + + boolean process(Exchange exchange, AsyncCallback callback); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java new file mode 100644 index 0000000..75449b6 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.processor; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.XStreamException; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.naming.NoNameCoder; +import com.thoughtworks.xstream.io.xml.CompactWriter; +import com.thoughtworks.xstream.io.xml.XppDriver; +import com.thoughtworks.xstream.mapper.CachingMapper; +import com.thoughtworks.xstream.mapper.CannotResolveClassException; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.eclipse.jetty.util.StringUtil; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.api.JodaTimeConverter; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.component.salesforce.api.dto.*; + +import java.io.*; + +import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.SOBJECT_NAME; + +public class XmlRestProcessor extends AbstractRestProcessor { + + // although XStream is generally thread safe, because of the way we use aliases + // for GET_BASIC_INFO and GET_DESCRIPTION, we need to use a ThreadLocal + // not very efficient when both JSON and XML are used together with a single Thread pool + // but this will do for now + private static ThreadLocal<XStream> xStream = + new ThreadLocal<XStream>() { + @Override + protected XStream initialValue() { + // use NoNameCoder to avoid escaping __ in custom field names + // and CompactWriter to avoid pretty printing + XStream result = new XStream(new XppDriver(new NoNameCoder()) { + @Override + public HierarchicalStreamWriter createWriter(Writer out) { + return new CompactWriter(out, getNameCoder()); + } + + }); + result.registerConverter(new JodaTimeConverter()); + return result; + } + }; + + private static final String RESPONSE_ALIAS = XmlRestProcessor.class.getName() + ".responseAlias"; + + public XmlRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException { + super(endpoint); + + } + + @Override + protected void processRequest(Exchange exchange) throws SalesforceException { + + switch (operationName) { + case GET_VERSIONS: + exchange.setProperty(RESPONSE_CLASS, Versions.class); + break; + + case GET_RESOURCES: + exchange.setProperty(RESPONSE_CLASS, RestResources.class); + break; + + case GET_GLOBAL_OBJECTS: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, GlobalObjects.class); + break; + + case GET_BASIC_INFO: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, SObjectBasicInfo.class); + + // need to add alias for Salesforce XML that uses SObject name as root element + exchange.setProperty(RESPONSE_ALIAS, + getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL)); + break; + + case GET_DESCRIPTION: + // handle in built response types + exchange.setProperty(RESPONSE_CLASS, SObjectDescription.class); + + // need to add alias for Salesforce XML that uses SObject name as root element + exchange.setProperty(RESPONSE_ALIAS, + getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL)); + break; + + case GET_SOBJECT: + // need to add alias for Salesforce XML that uses SObject name as root element + exchange.setProperty(RESPONSE_ALIAS, + getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL)); + break; + + case CREATE_SOBJECT: + // handle known response type + exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class); + break; + + case GET_SOBJECT_WITH_ID: + // need to add alias for Salesforce XML that uses SObject name as root element + exchange.setProperty(RESPONSE_ALIAS, + getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL)); + break; + + case UPSERT_SOBJECT: + // handle known response type + exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class); + break; + + case QUERY: + case QUERY_MORE: + // need to add alias for Salesforce XML that uses SObject name as root element + exchange.setProperty(RESPONSE_ALIAS, + "QueryResult"); + break; + + case SEARCH: + // handle known response type + exchange.setProperty(RESPONSE_CLASS, SearchResults.class); + break; + + } + + } + + protected InputStream getRequestStream(Exchange exchange) throws SalesforceException { + final XStream localXStream = xStream.get(); + try { + // get request stream from In message + Message in = exchange.getIn(); + InputStream request = in.getBody(InputStream.class); + if (request == null) { + AbstractSObjectBase sObject = in.getBody(AbstractSObjectBase.class); + if (sObject != null) { + // marshall the SObject + // first process annotations on the class, for things like alias, etc. + localXStream.processAnnotations(sObject.getClass()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + // make sure we write the XML with the right encoding + localXStream.toXML(sObject, new OutputStreamWriter(out, StringUtil.__UTF8_CHARSET)); + request = new ByteArrayInputStream(out.toByteArray()); + } else { + // if all else fails, get body as String + final String body = in.getBody(String.class); + if (null == body) { + String msg = "Unsupported request message body " + + (in.getBody() == null ? null : in.getBody().getClass()); + throw new SalesforceException(msg, null); + } else { + request = new ByteArrayInputStream(body.getBytes(StringUtil.__UTF8_CHARSET)); + } + } + } + return request; + } catch (XStreamException e) { + String msg = "Error marshaling request: " + e.getMessage(); + throw new SalesforceException(msg, e); + } + } + + @Override + protected void processResponse(Exchange exchange, InputStream responseEntity, + SalesforceException exception, AsyncCallback callback) { + final XStream localXStream = xStream.get(); + try { + // do we need to un-marshal a response + if (responseEntity != null) { + final Class<?> responseClass = exchange.getProperty(RESPONSE_CLASS, Class.class); + Object response; + if (responseClass != null) { + // its ok to call this multiple times, as xstream ignores duplicate calls + localXStream.processAnnotations(responseClass); + final String responseAlias = exchange.getProperty(RESPONSE_ALIAS, String.class); + if (responseAlias != null) { + // extremely dirty, need to flush entire cache if its holding on to an old alias!!! + final CachingMapper mapper = (CachingMapper) localXStream.getMapper(); + try { + if (mapper.realClass(responseAlias) != responseClass) { + mapper.flushCache(); + } + } catch (CannotResolveClassException ignore) { + } + localXStream.alias(responseAlias, responseClass); + } + response = responseClass.newInstance(); + localXStream.fromXML(responseEntity, response); + } else { + // return the response as a stream, for getBlobField + response = responseEntity; + } + exchange.getOut().setBody(response); + } else { + exchange.setException(exception); + } + // copy headers and attachments + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments()); + } catch (XStreamException e) { + String msg = "Error parsing XML response: " + e.getMessage(); + exchange.setException(new SalesforceException(msg, e)); + } catch (Exception e) { + String msg = "Error creating XML response: " + e.getMessage(); + exchange.setException(new SalesforceException(msg, e)); + } finally { + // cleanup temporary exchange headers + exchange.removeProperty(RESPONSE_CLASS); + exchange.removeProperty(RESPONSE_ALIAS); + + // consume response entity + if (responseEntity != null) { + try { + responseEntity.close(); + } catch (IOException ignored) { + } + } + + // notify callback that exchange is done + callback.done(false); + } + } + +}