This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 6d9dcca28a1 feat(camel-salesforce): Add Event ID as header for Pub/Sub API event consumption (#17298) 6d9dcca28a1 is described below commit 6d9dcca28a1b2b4ced2ea02b7c38f840e64f2fed Author: Nathan Allen <nathan.al...@toasttab.com> AuthorDate: Fri Feb 28 03:03:20 2025 -0500 feat(camel-salesforce): Add Event ID as header for Pub/Sub API event consumption (#17298) * add event id as header for pub/sub consumption * change header javaTypes * generate json --- .../camel/component/salesforce/salesforce.json | 39 +++++++++++----------- .../component/salesforce/PubSubApiConsumer.java | 4 ++- .../component/salesforce/SalesforceConstants.java | 9 +++-- .../internal/client/PubSubApiClient.java | 3 +- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json index 0501bd30708..8b4597286f3 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json @@ -130,25 +130,26 @@ }, "headers": { "CamelSalesforceReplayId": { "index": 0, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Streaming API replayId.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_REPLAY_ID" }, - "CamelSalesforcePubSubReplayId": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Pub\/Sub API replayId.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_REPLAY_ID" }, - "CamelSalesforceChangeEventSchema": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change event schema.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA" }, - "CamelSalesforceEventType": { "index": 3, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event type.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_EVENT_TYPE" }, - "CamelSalesforceCommitTimestamp": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit timestamp.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_TIMESTAMP" }, - "CamelSalesforceCommitUser": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit user.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_USER" }, - "CamelSalesforceCommitNumber": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit number.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_NUMBER" }, - "CamelSalesforceRecordIds": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The record ids.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_RECORD_IDS" }, - "CamelSalesforceChangeType": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change type.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_TYPE" }, - "CamelSalesforceChangeOrigin": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change origin.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_ORIGIN" }, - "CamelSalesforceTransactionKey": { "index": 10, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The transaction key.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TRANSACTION_KEY" }, - "CamelSalesforceSequenceNumber": { "index": 11, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The sequence number.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_SEQUENCE_NUMBER" }, - "CamelSalesforceIsTransactionEnd": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Is transaction end.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_IS_TRANSACTION_END" }, - "CamelSalesforceEntityName": { "index": 13, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The entity name.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_ENTITY_NAME" }, - "CamelSalesforcePlatformEventSchema": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The platform event schema.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PLATFORM_EVENT_SCHEMA" }, - "CamelSalesforceCreatedDate": { "index": 15, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "java.time.ZonedDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The created date.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CREATED_DATE" }, - "CamelSalesforceTopicName": { "index": 16, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The topic name.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TOPIC_NAME" }, - "CamelSalesforceChannel": { "index": 17, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The channel.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANNEL" }, - "CamelSalesforceClientId": { "index": 18, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The client id.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CLIENT_ID" }, - "CamelSalesforceQueryResultTotalSize": { "index": 19, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of records matching a query.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE" } + "CamelSalesforceChangeEventSchema": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change event schema.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA" }, + "CamelSalesforceEventType": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event type.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_EVENT_TYPE" }, + "CamelSalesforceCommitTimestamp": { "index": 3, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit timestamp.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_TIMESTAMP" }, + "CamelSalesforceCommitUser": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit user.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_USER" }, + "CamelSalesforceCommitNumber": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit number.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_NUMBER" }, + "CamelSalesforceRecordIds": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The record ids.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_RECORD_IDS" }, + "CamelSalesforceChangeType": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change type.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_TYPE" }, + "CamelSalesforceChangeOrigin": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change origin.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_ORIGIN" }, + "CamelSalesforceTransactionKey": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The transaction key.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TRANSACTION_KEY" }, + "CamelSalesforceSequenceNumber": { "index": 10, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The sequence number.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_SEQUENCE_NUMBER" }, + "CamelSalesforceIsTransactionEnd": { "index": 11, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Is transaction end.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_IS_TRANSACTION_END" }, + "CamelSalesforceEntityName": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The entity name.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_ENTITY_NAME" }, + "CamelSalesforcePlatformEventSchema": { "index": 13, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The platform event schema.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PLATFORM_EVENT_SCHEMA" }, + "CamelSalesforceCreatedDate": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "java.time.ZonedDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The created date.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CREATED_DATE" }, + "CamelSalesforceTopicName": { "index": 15, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The topic name.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TOPIC_NAME" }, + "CamelSalesforceChannel": { "index": 16, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The channel.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANNEL" }, + "CamelSalesforceClientId": { "index": 17, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The client id.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CLIENT_ID" }, + "CamelSalesforcePubSubReplayId": { "index": 18, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Pub\/Sub API replayId.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_REPLAY_ID" }, + "CamelSalesforcePubSubEventId": { "index": 19, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Pub\/Sub API event id.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_EVENT_ID" }, + "CamelSalesforceQueryResultTotalSize": { "index": 20, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of records matching a query.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE" } }, "properties": { "operationName": { "index": 0, "kind": "path", "displayName": "Operation Name", "group": "common", "label": "common", "required": true, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.OperationName", "enum": [ "getVersions", "getResources", "getGlobalObjects", "getBasicInfo", "getDescription", "getSObject", "createSObject", "updateSObject", "deleteSObject", "getSObjectWithId", "upsertSObject", "deleteSObjectWithId", "getBlobField", "query", "queryMore", [...] diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java index ab7310e3eec..2f86af72c96 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java @@ -28,6 +28,7 @@ import org.apache.camel.component.salesforce.internal.client.PubSubApiClient; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.service.ServiceHelper; +import static org.apache.camel.component.salesforce.SalesforceConstants.HEADER_SALESFORCE_PUBSUB_EVENT_ID; import static org.apache.camel.component.salesforce.SalesforceConstants.HEADER_SALESFORCE_PUBSUB_REPLAY_ID; public class PubSubApiConsumer extends DefaultConsumer { @@ -62,10 +63,11 @@ public class PubSubApiConsumer extends DefaultConsumer { } } - public void processEvent(Object recordObj, String replayId) { + public void processEvent(Object recordObj, String eventId, String replayId) { final Exchange exchange = createExchange(true); final Message in = exchange.getIn(); in.setBody(recordObj); + in.setHeader(HEADER_SALESFORCE_PUBSUB_EVENT_ID, eventId); in.setHeader(HEADER_SALESFORCE_PUBSUB_REPLAY_ID, replayId); AsyncCallback cb = defaultConsumerCallback(exchange, true); getAsyncProcessor().process(exchange, cb); diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java index 61b6d78316a..b1c6e2703e1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java @@ -20,10 +20,9 @@ import org.apache.camel.spi.Metadata; public final class SalesforceConstants { + // Streaming headers @Metadata(label = "consumer", description = "The Streaming API replayId.", javaType = "Object") public static final String HEADER_SALESFORCE_REPLAY_ID = "CamelSalesforceReplayId"; - @Metadata(label = "consumer", description = "The Pub/Sub API replayId.", javaType = "Object") - public static final String HEADER_SALESFORCE_PUBSUB_REPLAY_ID = "CamelSalesforcePubSubReplayId"; @Metadata(label = "consumer", description = "The change event schema.", javaType = "Object") public static final String HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA = "CamelSalesforceChangeEventSchema"; @Metadata(label = "consumer", description = "The event type.", javaType = "String") @@ -59,6 +58,12 @@ public final class SalesforceConstants { @Metadata(label = "consumer", description = "The client id.", javaType = "String") public static final String HEADER_SALESFORCE_CLIENT_ID = "CamelSalesforceClientId"; + // Pub/Sub API headers + @Metadata(label = "consumer", description = "The Pub/Sub API replayId.", javaType = "String") + public static final String HEADER_SALESFORCE_PUBSUB_REPLAY_ID = "CamelSalesforcePubSubReplayId"; + @Metadata(label = "consumer", description = "The Pub/Sub API event id.", javaType = "String") + public static final String HEADER_SALESFORCE_PUBSUB_EVENT_ID = "CamelSalesforcePubSubEventId"; + @Metadata(label = "producer", description = "Total number of records matching a query.", javaType = "int") public static final String HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE = "CamelSalesforceQueryResultTotalSize"; diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java index ef16d7ecf4f..375b08b0ee3 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java @@ -457,8 +457,9 @@ public class PubSubApiClient extends ServiceSupport { case POJO -> deserializePojo(ce, schema); case JSON -> deserializeJson(ce, schema); }; + String eventId = ce.getEvent().getId(); String replayId = PubSubApiClient.base64EncodeByteString(ce.getReplayId()); - consumer.processEvent(recordObj, replayId); + consumer.processEvent(recordObj, eventId, replayId); } private Object deserializeAvro(ConsumerEvent ce, Schema schema) throws IOException {