CAMEL-10743: Added support for rawPayload parameter in Salesforce Streaming API
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bb2adef7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bb2adef7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bb2adef7 Branch: refs/heads/camel-2.19.x Commit: bb2adef735af8b0e301651a391eff8fde246f3ba Parents: da79c3b Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Thu Aug 3 00:38:52 2017 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Thu Aug 3 01:14:26 2017 -0700 ---------------------------------------------------------------------- .../component/salesforce/SalesforceConsumer.java | 5 ++++- .../salesforce/StreamingApiIntegrationTest.java | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bb2adef7/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java index df29dd8..0484aee 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java @@ -174,7 +174,10 @@ public class SalesforceConsumer extends DefaultConsumer { final String sObjectString = objectMapper.writeValueAsString(sObject); log.debug("Received SObject: {}", sObjectString); - if (sObjectClass == null) { + if (endpoint.getConfiguration().getRawPayload()) { + // return sobject string as exchange body + in.setBody(sObjectString); + } else if (sObjectClass == null) { // return sobject map as exchange body in.setBody(sObject); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/bb2adef7/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java index e0ed9ff..33139df 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java @@ -36,6 +36,12 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase { mock.expectedHeaderReceived("CamelSalesforceTopicName", "CamelTestTopic"); mock.expectedHeaderReceived("CamelSalesforceChannel", "/topic/CamelTestTopic"); + MockEndpoint rawPayloadMock = getMockEndpoint("mock:RawPayloadCamelTestTopic"); + rawPayloadMock.expectedMessageCount(1); + // assert expected static headers + rawPayloadMock.expectedHeaderReceived("CamelSalesforceTopicName", "CamelTestTopic"); + rawPayloadMock.expectedHeaderReceived("CamelSalesforceChannel", "/topic/CamelTestTopic"); + Merchandise__c merchandise = new Merchandise__c(); merchandise.setName("TestNotification"); merchandise.setDescription__c("Merchandise for testing Streaming API updated on " + ZonedDateTime.now().toString()); @@ -50,6 +56,7 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase { mock.assertIsSatisfied(); final Message in = mock.getExchanges().get(0).getIn(); merchandise = in.getMandatoryBody(Merchandise__c.class); + assertNotNull("Missing event body", merchandise); log.info("Merchandise notification: {}", merchandise.toString()); assertNotNull("Missing field Id", merchandise.getId()); @@ -60,6 +67,11 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase { assertNotNull("Missing header CamelSalesforceEventType", in.getHeader("CamelSalesforceEventType")); assertNotNull("Missing header CamelSalesforceCreatedDate", in.getHeader("CamelSalesforceCreatedDate")); + // validate raw payload message + rawPayloadMock.assertIsSatisfied(); + final Message inRaw = rawPayloadMock.getExchanges().get(0).getIn(); + assertTrue("Expected String message body for Raw Payload", inRaw.getBody() instanceof String); + } finally { // remove the test record assertNull(template().requestBody("direct:deleteSObjectWithId", merchandise)); @@ -89,6 +101,11 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase { + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c"). to("mock:CamelTestTopic"); + from("salesforce:CamelTestTopic?rawPayload=true¬ifyForFields=ALL&" + + "notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&" + + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c"). + to("mock:RawPayloadCamelTestTopic"); + // route for creating test record from("direct:upsertSObject"). to("salesforce:upsertSObject?SObjectIdName=Name");