This is an automated email from the ASF dual-hosted git repository. jeremyross pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new e9dd46e23b2 CAMEL-18317: camel-salesforce: Add `subscribe` operation. e9dd46e23b2 is described below commit e9dd46e23b29068331af6a523453edbe815bf485 Author: Jeremy Ross <jeremy.g.r...@gmail.com> AuthorDate: Sun Aug 7 13:54:07 2022 -0500 CAMEL-18317: camel-salesforce: Add `subscribe` operation. This allows endpoint syntax to have a mandatory operationName. --- .../salesforce/SalesforceEndpointUriFactory.java | 2 +- .../camel/component/salesforce/salesforce.json | 2 +- .../src/main/docs/salesforce-component.adoc | 24 +++++++++++----------- .../component/salesforce/SalesforceComponent.java | 23 +++++++++++---------- .../component/salesforce/SalesforceEndpoint.java | 13 ++++-------- .../salesforce/internal/OperationName.java | 4 +++- .../ChangeEventsConsumerIntegrationTest.java | 2 +- .../PlatformEventsConsumerIntegrationTest.java | 3 ++- .../salesforce/StreamingApiIntegrationTest.java | 6 +++--- .../ROOT/pages/camel-3x-upgrade-guide-3_19.adoc | 4 ++++ 10 files changed, 43 insertions(+), 40 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java index 86a2b97c7a5..4d10a0fe5d2 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java @@ -99,7 +99,7 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo Map<String, Object> copy = new HashMap<>(properties); - uri = buildPathParameter(syntax, uri, "operationName", null, false, copy); + uri = buildPathParameter(syntax, uri, "operationName", null, true, copy); uri = buildPathParameter(syntax, uri, "topicName", null, false, copy); uri = buildQueryParameters(uri, copy, encode); return uri; diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json index 56ef73e696e..56a68b8a6df 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json @@ -138,7 +138,7 @@ "CamelSalesforceQueryResultTotalSize": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of records matching a query.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE" } }, "properties": { - "operationName": { "kind": "path", "displayName": "Operation Name", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.OperationName", "enum": [ "getVersions", "getResources", "getGlobalObjects", "getBasicInfo", "getDescription", "getSObject", "createSObject", "updateSObject", "deleteSObject", "getSObjectWithId", "upsertSObject", "deleteSObjectWithId", "getBlobField", "query", "queryMore", "queryA [...] + "operationName": { "kind": "path", "displayName": "Operation Name", "group": "common", "label": "common", "required": true, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.OperationName", "enum": [ "getVersions", "getResources", "getGlobalObjects", "getBasicInfo", "getDescription", "getSObject", "createSObject", "updateSObject", "deleteSObject", "getSObjectWithId", "upsertSObject", "deleteSObjectWithId", "getBlobField", "query", "queryMore", "queryAll", [...] "topicName": { "kind": "path", "displayName": "Topic Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the topic\/channel to use" }, "apexMethod": { "kind": "parameter", "displayName": "Apex Method", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "APEX method name" }, "apexQueryParams": { "kind": "parameter", "displayName": "Apex Query Params", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Query params for APEX method" }, diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc index c527bbe02d3..fbb608cb786 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc +++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc @@ -115,7 +115,7 @@ must contain only single certificate-private key entry. When used as a consumer, receiving streaming events, the URI scheme is: ---- -salesforce:topic?options +salesforce:subscribe:topic?options ---- When used as a producer, invoking the Salesforce REST APIs, the URI @@ -1618,21 +1618,21 @@ Data Capture events. The URI format for consuming Push Topics is: ---- -salesforce:<topic_name>[?options] +salesforce:subscribe:<topic_name>[?options] ---- To create and subscribe to a topic [source,java] ---- -from("salesforce:CamelTestTopic?notifyForFields=ALL¬ifyForOperations=ALL&sObjectName=Merchandise__c&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c")... +from("salesforce:subscribe:CamelTestTopic?notifyForFields=ALL¬ifyForOperations=ALL&sObjectName=Merchandise__c&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c")... ---- To subscribe to an existing topic [source,java] ---- -from("salesforce:CamelTestTopic&sObjectName=Merchandise__c")... +from("salesforce:subscribe:CamelTestTopic&sObjectName=Merchandise__c")... ---- |=== @@ -1663,14 +1663,14 @@ To emit a platform event use the <<createSObject,createSObject>> operation, pass The URI format for consuming platform events is: ---- -salesforce:event/<event_name> +salesforce:subscribe:event/<event_name> ---- For example, to receive platform events use for the event type `Order_Event__e`: [source,java] ---- -from("salesforce:event/Order_Event__e") +from("salesforce:subscribe:event/Order_Event__e") ---- |=== @@ -1694,25 +1694,25 @@ The URI format to consume CDC events is as follows: All Selected Entities ---- -salesforce:data/ChangeEvents +salesforce:subscribe:data/ChangeEvents ---- Standard Objects ---- -salesforce:data/<Standard_Object_Name>ChangeEvent +salesforce:subscribe:data/<Standard_Object_Name>ChangeEvent ---- Custom Objects ---- -salesforce:data/<Custom_Object_Name>__ChangeEvent +salesforce:subscribe:data/<Custom_Object_Name>__ChangeEvent ---- Here are a few examples [source,java] ---- -from("salesforce:data/ChangeEvents?replayId=-1").log("being notified of all change events") -from("salesforce:data/AccountChangeEvent?replayId=-1").log("being notified of change events for Account records") -from("salesforce:data/Employee__ChangeEvent?replayId=-1").log("being notified of change events for Employee__c custom object") +from("salesforce:subscribe:data/ChangeEvents?replayId=-1").log("being notified of all change events") +from("salesforce:subscribe:data/AccountChangeEvent?replayId=-1").log("being notified of change events for Account records") +from("salesforce:subscribe:data/Employee__ChangeEvent?replayId=-1").log("being notified of change events for Employee__c custom object") ---- More details about how to use the Camel Salesforce component change data capture capabilities could be found in the https://github.com/apache/camel/tree/main/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java[ChangeEventsConsumerIntegrationTest]. diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index 80a699c47f3..bd9d4108be1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -288,19 +288,20 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP OperationName operationName = null; String topicName = null; String apexUrl = null; - try { - LOG.debug("Creating endpoint for: {}", remaining); - if (remaining.startsWith(APEX_CALL_PREFIX)) { - // extract APEX URL - apexUrl = remaining.substring(APEX_CALL_PREFIX.length()); - remaining = OperationName.APEX_CALL.value(); + LOG.debug("Creating endpoint for: {}", remaining); + if (remaining.startsWith(APEX_CALL_PREFIX)) { + // extract APEX URL + apexUrl = remaining.substring(APEX_CALL_PREFIX.length()); + remaining = OperationName.APEX_CALL.value(); + } else if (remaining.startsWith(OperationName.SUBSCRIBE.value())) { + final String[] parts = remaining.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("topicName must be supplied for subscribe operation."); } - operationName = OperationName.fromValue(remaining); - } catch (IllegalArgumentException ex) { - // if its not an operation name, treat is as topic name for consumer - // endpoints - topicName = remaining; + remaining = parts[0]; + topicName = parts[1]; } + operationName = OperationName.fromValue(remaining); // create endpoint config if (config == null) { diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java index f9ad7993098..da9702ed0dc 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java @@ -22,6 +22,7 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.salesforce.internal.OperationName; import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper; +import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; @@ -41,7 +42,7 @@ public class SalesforceEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(SalesforceEndpoint.class); //CHECKSTYLE:OFF - @UriPath(label = "producer", description = "The operation to use", enums = "getVersions," + @UriPath( label = "common", description = "The operation to use", enums = "getVersions," + "getResources,getGlobalObjects,getBasicInfo,getDescription,getSObject,createSObject," + "updateSObject,deleteSObject,getSObjectWithId,upsertSObject,deleteSObjectWithId," + "getBlobField,query,queryMore,queryAll,search,apexCall,recent,createJob,getJob," @@ -56,7 +57,8 @@ public class SalesforceEndpoint extends DefaultEndpoint { + "bulk2AbortJob,bulk2DeleteJob,bulk2GetSuccessfulResults,bulk2GetFailedResults," + "bulk2GetUnprocessedRecords,bulk2CreateQueryJob,bulk2GetQueryJob," + "bulk2GetAllQueryJobs,bulk2GetQueryJobResults,bulk2AbortQueryJob,bulk2DeleteQueryJob," - + "raw") + + "raw,subscribe") + @Metadata(required = true) private final OperationName operationName; //CHECKSTYLE:ON @UriPath(label = "consumer", description = "The name of the topic/channel to use") @@ -89,13 +91,6 @@ public class SalesforceEndpoint extends DefaultEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { - // consumer requires a topicName, operation name must be the invalid - // topic name - if (topicName == null) { - throw new IllegalArgumentException( - String.format("Invalid topic name %s, matches a producer operation name", operationName.value())); - } - final SubscriptionHelper subscriptionHelper = getComponent().getSubscriptionHelper(); final SalesforceConsumer consumer = new SalesforceConsumer(this, processor, subscriptionHelper); configureConsumer(consumer); diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java index 53811af0c00..c05403850b9 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java @@ -105,7 +105,9 @@ public enum OperationName { COMPOSITE_DELETE_SOBJECT_COLLECTIONS("compositeDeleteSObjectCollections"), // Raw operation - RAW("raw"); + RAW("raw"), + + SUBSCRIBE("subscribe"); private final String value; diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java index 83e6060bc45..02ae66c6f28 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java @@ -101,7 +101,7 @@ public class ChangeEventsConsumerIntegrationTest extends AbstractSalesforceTestB return new RouteBuilder() { @Override public void configure() { - from("salesforce:data/ChangeEvents?replayId=-1").to(capturedChangeEvents); + from("salesforce:subscribe:data/ChangeEvents?replayId=-1").to(capturedChangeEvents); } }; } diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java index cecd0e34697..03341d84f1c 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java @@ -39,7 +39,8 @@ public class PlatformEventsConsumerIntegrationTest extends AbstractSalesforceTes final ExecutorService parallel = Executors.newSingleThreadExecutor(); final Future<PlatformEvent> futurePlatformEvent - = parallel.submit(() -> consumer.receiveBody("salesforce:event/TestEvent__e?replayId=-1", PlatformEvent.class)); + = parallel.submit( + () -> consumer.receiveBody("salesforce:subscribe:event/TestEvent__e?replayId=-1", PlatformEvent.class)); // it takes some time for the subscriber to subscribe, so we'll try to // send repeated platform events and wait until the first one is diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java index 30de8291c98..07a8b7de8b1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java @@ -107,16 +107,16 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase { public void configure() throws Exception { // test topic subscription - from("salesforce:CamelTestTopic?notifyForFields=ALL&" + from("salesforce:subscribe:CamelTestTopic?notifyForFields=ALL&" + "notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&" + "sObjectName=Merchandise__c&" + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c") .to("mock:CamelTestTopic"); - from("salesforce:CamelTestTopic?rawPayload=true¬ifyForFields=ALL&" + from("salesforce:subscribe:CamelTestTopic?rawPayload=true¬ifyForFields=ALL&" + "notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&" + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c").to("mock:RawPayloadCamelTestTopic"); - from("salesforce:CamelFallbackTestTopic?notifyForFields=ALL&defaultReplayId=9999&" + from("salesforce:subscribe:CamelFallbackTestTopic?notifyForFields=ALL&defaultReplayId=9999&" + "notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&" + "sObjectName=Merchandise__c&" + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c") .to("mock:CamelFallbackTestTopic"); diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc index cfc824c59b5..483ba331c62 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc @@ -15,3 +15,7 @@ Added `addClassLoader` method to `org.apache.camel.spi.ClassResolver`. Removed using `template` as a custom alias for `routeTemplate` or `route-template`. Removed the `tod` custom alias for `toD` or `to-d`. + +=== camel-salesforce + +The URI format for consumer operations has changed. All consumer URIs now use the `subscribe` operation. E.g., `salesforce:subscribe:<topic_name>`, `salesforce:subscribe:event/<event_name>`, `salesforce:subscribe:data/ChangeEvents`.