This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 9832f0a CAMEL-16588: Add Changefeed support to camel-azure-storage-blob component (#5931) 9832f0a is described below commit 9832f0a7b100034a4344225c6455a1c298bc059f Author: Omar Al-Safi <omars...@gmail.com> AuthorDate: Thu Aug 5 10:50:41 2021 +0200 CAMEL-16588: Add Changefeed support to camel-azure-storage-blob component (#5931) * CAMEL-16588: Add Blob changefeed support * CAMEL-16588: Add Blob changefeed docs --- .../catalog/docs/azure-storage-blob-component.adoc | 14 ++- .../camel-azure/camel-azure-storage-blob/pom.xml | 6 ++ .../storage/blob/BlobComponentConfigurer.java | 18 ++++ .../azure/storage/blob/BlobEndpointConfigurer.java | 18 ++++ .../azure/storage/blob/BlobEndpointUriFactory.java | 43 ++++---- .../azure/storage/blob/azure-storage-blob.json | 6 ++ .../main/docs/azure-storage-blob-component.adoc | 14 ++- .../azure/storage/blob/BlobConfiguration.java | 46 ++++++++ .../blob/BlobConfigurationOptionsProxy.java | 15 +++ .../azure/storage/blob/BlobConstants.java | 4 + .../azure/storage/blob/BlobExchangeHeaders.java | 13 +++ .../storage/blob/BlobOperationsDefinition.java | 5 + .../component/azure/storage/blob/BlobProducer.java | 13 +++ .../blob/operations/BlobChangeFeedOperations.java | 67 ++++++++++++ .../azure/storage/blob/BlobComponentTest.java | 29 +++++ .../integration/BlobChangeFeedOperationsIT.java | 104 ++++++++++++++++++ .../src/test/resources/log4j2.properties | 2 +- .../AzureStorageBlobComponentBuilderFactory.java | 61 +++++++++++ .../endpoint/dsl/BlobEndpointBuilderFactory.java | 120 ++++++++++++++++++++- .../ROOT/pages/azure-storage-blob-component.adoc | 14 ++- parent/pom.xml | 1 + 21 files changed, 585 insertions(+), 28 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/azure-storage-blob-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/azure-storage-blob-component.adoc index 1a6bf2c..11f7351 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/azure-storage-blob-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/azure-storage-blob-component.adoc @@ -65,7 +65,7 @@ to("file://blobdirectory"); // component options: START -The Azure Storage Blob Service component supports 27 options, which are listed below. +The Azure Storage Blob Service component supports 30 options, which are listed below. @@ -89,6 +89,9 @@ The Azure Storage Blob Service component supports 27 options, which are listed b | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *blobSequenceNumber* (producer) | A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1.The default value is 0. | 0 | Long | *blockListType* (producer) | Specifies which type of blocks to return. There are 3 enums and the value can be one of: committed, uncommitted, all | COMMITTED | BlockListType +| *changeFeedContext* (producer) | When using getChangeFeed producer operation, this gives additional context that is passed through the Http pipeline during the service call. | | Context +| *changeFeedEndTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. | | OffsetDateTime +| *changeFeedStartTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. | | OffsetDateTime | *closeStreamAfterWrite* (producer) | Close the stream after write or keep it open, default is true | true | boolean | *commitBlockListLater* (producer) | When is set to true, the staged blocks will not be committed directly. | true | boolean | *createAppendBlob* (producer) | When is set to true, the append blocks will be created when committing append blocks. | true | boolean @@ -122,7 +125,7 @@ with the following path and query parameters: |=== -=== Query Parameters (44 parameters): +=== Query Parameters (47 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -149,6 +152,9 @@ with the following path and query parameters: | *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. | | PollingConsumerPollStrategy | *blobSequenceNumber* (producer) | A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1.The default value is 0. | 0 | Long | *blockListType* (producer) | Specifies which type of blocks to return. There are 3 enums and the value can be one of: committed, uncommitted, all | COMMITTED | BlockListType +| *changeFeedContext* (producer) | When using getChangeFeed producer operation, this gives additional context that is passed through the Http pipeline during the service call. | | Context +| *changeFeedEndTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. | | OffsetDateTime +| *changeFeedStartTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. | | OffsetDateTime | *closeStreamAfterWrite* (producer) | Close the stream after write or keep it open, default is true | true | boolean | *commitBlockListLater* (producer) | When is set to true, the staged blocks will not be committed directly. | true | boolean | *createAppendBlob* (producer) | When is set to true, the append blocks will be created when committing append blocks. | true | boolean @@ -231,6 +237,9 @@ aggregate this number of messages. |`CamelAzureStorageBlobContainerName`|`BlobConstants.BLOB_CONTAINER_NAME`|`String`|Operations related to container and blob|Override/set the container name on the exchange headers. |`CamelAzureStorageBlobOperation`|`BlobConstants.BLOB_OPERATION`|`BlobOperationsDefinition`|All|Specify the producer operation to execute, please see the doc on this page related to producer operation. |`CamelAzureStorageBlobRegex`|`BlobConstants.REGEX`|`String`|`listBlobs`,`getBlob`|Filters the results to return only blobs whose names match the specified regular expression. May be null to return all. If both prefix and regex are set, regex takes the priority and prefix is ignored. +|`CamelAzureStorageBlobChangeFeedStartTime`|`BlobConstants.CHANGE_FEED_START_TIME`|`OffsetDateTime`|`getChangeFeed`| It filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. +|`CamelAzureStorageBlobChangeFeedEndTime`|`BlobConstants.CHANGE_FEED_END_TIME`|`OffsetDateTime`|`getChangeFeed`| It filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. +|`CamelAzureStorageBlobChangeFeedContext`|`BlobConstants.CHANGE_FEED_CONTEXT`|`Context`|`getChangeFeed`| This gives additional context that is passed through the Http pipeline during the service call. |======================================================================= === Message headers set by either component producer or consumer @@ -316,6 +325,7 @@ For these operations, `accountName` is *required*. |=== |Operation |Description |`listBlobContainers` |Get the content of the blob. You can restrict the output of this operation to a blob range. +|`getChangeFeed` | Returns transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. The change feed provides ordered, guaranteed, durable, immutable, read-only log of these changes. |=== *Operations on the container level* diff --git a/components/camel-azure/camel-azure-storage-blob/pom.xml b/components/camel-azure/camel-azure-storage-blob/pom.xml index 88e38e1..7c0b66c 100644 --- a/components/camel-azure/camel-azure-storage-blob/pom.xml +++ b/components/camel-azure/camel-azure-storage-blob/pom.xml @@ -46,6 +46,12 @@ <groupId>com.azure</groupId> <artifactId>azure-storage-blob</artifactId> </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob-changefeed</artifactId> + <!-- Currently is not added to Azure BOM, once it has been added, we can remove the version from here --> + <version>${azure-storage-blob-changedfeed-version}</version> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobComponentConfigurer.java b/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobComponentConfigurer.java index c26c028..7b5ee4f 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobComponentConfigurer.java +++ b/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobComponentConfigurer.java @@ -44,6 +44,12 @@ public class BlobComponentConfigurer extends PropertyConfigurerSupport implement case "blockListType": getOrCreateConfiguration(target).setBlockListType(property(camelContext, com.azure.storage.blob.models.BlockListType.class, value)); return true; case "bridgeerrorhandler": case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true; + case "changefeedcontext": + case "changeFeedContext": getOrCreateConfiguration(target).setChangeFeedContext(property(camelContext, com.azure.core.util.Context.class, value)); return true; + case "changefeedendtime": + case "changeFeedEndTime": getOrCreateConfiguration(target).setChangeFeedEndTime(property(camelContext, java.time.OffsetDateTime.class, value)); return true; + case "changefeedstarttime": + case "changeFeedStartTime": getOrCreateConfiguration(target).setChangeFeedStartTime(property(camelContext, java.time.OffsetDateTime.class, value)); return true; case "closestreamafterread": case "closeStreamAfterRead": getOrCreateConfiguration(target).setCloseStreamAfterRead(property(camelContext, boolean.class, value)); return true; case "closestreamafterwrite": @@ -104,6 +110,12 @@ public class BlobComponentConfigurer extends PropertyConfigurerSupport implement case "blockListType": return com.azure.storage.blob.models.BlockListType.class; case "bridgeerrorhandler": case "bridgeErrorHandler": return boolean.class; + case "changefeedcontext": + case "changeFeedContext": return com.azure.core.util.Context.class; + case "changefeedendtime": + case "changeFeedEndTime": return java.time.OffsetDateTime.class; + case "changefeedstarttime": + case "changeFeedStartTime": return java.time.OffsetDateTime.class; case "closestreamafterread": case "closeStreamAfterRead": return boolean.class; case "closestreamafterwrite": @@ -160,6 +172,12 @@ public class BlobComponentConfigurer extends PropertyConfigurerSupport implement case "blockListType": return getOrCreateConfiguration(target).getBlockListType(); case "bridgeerrorhandler": case "bridgeErrorHandler": return target.isBridgeErrorHandler(); + case "changefeedcontext": + case "changeFeedContext": return getOrCreateConfiguration(target).getChangeFeedContext(); + case "changefeedendtime": + case "changeFeedEndTime": return getOrCreateConfiguration(target).getChangeFeedEndTime(); + case "changefeedstarttime": + case "changeFeedStartTime": return getOrCreateConfiguration(target).getChangeFeedStartTime(); case "closestreamafterread": case "closeStreamAfterRead": return getOrCreateConfiguration(target).isCloseStreamAfterRead(); case "closestreamafterwrite": diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointConfigurer.java b/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointConfigurer.java index 8491799..5a72e7a 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointConfigurer.java +++ b/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointConfigurer.java @@ -43,6 +43,12 @@ public class BlobEndpointConfigurer extends PropertyConfigurerSupport implements case "blockListType": target.getConfiguration().setBlockListType(property(camelContext, com.azure.storage.blob.models.BlockListType.class, value)); return true; case "bridgeerrorhandler": case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true; + case "changefeedcontext": + case "changeFeedContext": target.getConfiguration().setChangeFeedContext(property(camelContext, com.azure.core.util.Context.class, value)); return true; + case "changefeedendtime": + case "changeFeedEndTime": target.getConfiguration().setChangeFeedEndTime(property(camelContext, java.time.OffsetDateTime.class, value)); return true; + case "changefeedstarttime": + case "changeFeedStartTime": target.getConfiguration().setChangeFeedStartTime(property(camelContext, java.time.OffsetDateTime.class, value)); return true; case "closestreamafterread": case "closeStreamAfterRead": target.getConfiguration().setCloseStreamAfterRead(property(camelContext, boolean.class, value)); return true; case "closestreamafterwrite": @@ -135,6 +141,12 @@ public class BlobEndpointConfigurer extends PropertyConfigurerSupport implements case "blockListType": return com.azure.storage.blob.models.BlockListType.class; case "bridgeerrorhandler": case "bridgeErrorHandler": return boolean.class; + case "changefeedcontext": + case "changeFeedContext": return com.azure.core.util.Context.class; + case "changefeedendtime": + case "changeFeedEndTime": return java.time.OffsetDateTime.class; + case "changefeedstarttime": + case "changeFeedStartTime": return java.time.OffsetDateTime.class; case "closestreamafterread": case "closeStreamAfterRead": return boolean.class; case "closestreamafterwrite": @@ -223,6 +235,12 @@ public class BlobEndpointConfigurer extends PropertyConfigurerSupport implements case "blockListType": return target.getConfiguration().getBlockListType(); case "bridgeerrorhandler": case "bridgeErrorHandler": return target.isBridgeErrorHandler(); + case "changefeedcontext": + case "changeFeedContext": return target.getConfiguration().getChangeFeedContext(); + case "changefeedendtime": + case "changeFeedEndTime": return target.getConfiguration().getChangeFeedEndTime(); + case "changefeedstarttime": + case "changeFeedStartTime": return target.getConfiguration().getChangeFeedStartTime(); case "closestreamafterread": case "closeStreamAfterRead": return target.getConfiguration().isCloseStreamAfterRead(); case "closestreamafterwrite": diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointUriFactory.java b/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointUriFactory.java index 37edd6e..6ab32be 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointUriFactory.java +++ b/components/camel-azure/camel-azure-storage-blob/src/generated/java/org/apache/camel/component/azure/storage/blob/BlobEndpointUriFactory.java @@ -20,52 +20,55 @@ public class BlobEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(46); + Set<String> props = new HashSet<>(49); props.add("blobName"); - props.add("backoffMultiplier"); - props.add("accountName"); - props.add("credentials"); props.add("prefix"); - props.add("createPageBlob"); props.add("initialDelay"); - props.add("blobOffset"); - props.add("timeout"); - props.add("dataCount"); - props.add("scheduler"); props.add("blobServiceClient"); - props.add("maxRetryRequests"); props.add("bridgeErrorHandler"); - props.add("useFixedDelay"); - props.add("runLoggingLevel"); props.add("containerName"); - props.add("backoffErrorThreshold"); props.add("closeStreamAfterRead"); props.add("greedy"); props.add("scheduledExecutorService"); props.add("closeStreamAfterWrite"); props.add("repeatCount"); - props.add("timeUnit"); props.add("maxResultsPerPage"); - props.add("downloadLinkExpiration"); props.add("sendEmptyMessageWhenIdle"); props.add("schedulerProperties"); - props.add("exchangePattern"); props.add("backoffIdleThreshold"); - props.add("blockListType"); props.add("createAppendBlob"); props.add("regex"); props.add("lazyStartProducer"); props.add("delay"); - props.add("pollStrategy"); props.add("blobSequenceNumber"); props.add("startScheduler"); props.add("accessKey"); props.add("commitBlockListLater"); + props.add("blobType"); + props.add("exceptionHandler"); + props.add("backoffMultiplier"); + props.add("accountName"); + props.add("credentials"); + props.add("createPageBlob"); + props.add("blobOffset"); + props.add("timeout"); + props.add("dataCount"); + props.add("scheduler"); + props.add("maxRetryRequests"); + props.add("useFixedDelay"); + props.add("runLoggingLevel"); + props.add("backoffErrorThreshold"); + props.add("changeFeedStartTime"); + props.add("timeUnit"); + props.add("downloadLinkExpiration"); + props.add("exchangePattern"); + props.add("changeFeedEndTime"); + props.add("blockListType"); + props.add("pollStrategy"); props.add("serviceClient"); props.add("fileDir"); - props.add("blobType"); + props.add("changeFeedContext"); props.add("pageBlobSize"); - props.add("exceptionHandler"); props.add("operation"); PROPERTY_NAMES = Collections.unmodifiableSet(props); Set<String> secretProps = new HashSet<>(1); diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json index c1c2351..51bd6c4 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json +++ b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json @@ -39,6 +39,9 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "blobSequenceNumber": { "kind": "property", "displayName": "Blob Sequence Number", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "A user-controlled value that you can use to track requests. The value of [...] "blockListType": { "kind": "property", "displayName": "Block List Type", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.storage.blob.models.BlockListType", "enum": [ "committed", "uncommitted", "all" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "COMMITTED", "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": [...] + "changeFeedContext": { "kind": "property", "displayName": "Change Feed Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.core.util.Context", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When using getChangeFeed producer operation, this gives additional context that is [...] + "changeFeedEndTime": { "kind": "property", "displayName": "Change Feed End Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When using getChangeFeed producer operation, this filters the results to return event [...] + "changeFeedStartTime": { "kind": "property", "displayName": "Change Feed Start Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When using getChangeFeed producer operation, this filters the results to return e [...] "closeStreamAfterWrite": { "kind": "property", "displayName": "Close Stream After Write", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "Close the stream after write or keep it open, default is true" }, "commitBlockListLater": { "kind": "property", "displayName": "Commit Block List Later", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When is set to true, the staged blocks will not be committed directly." }, "createAppendBlob": { "kind": "property", "displayName": "Create Append Blob", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When is set to true, the append blocks will be created when committing append blocks." }, @@ -74,6 +77,9 @@ "pollStrategy": { "kind": "parameter", "displayName": "Poll Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.PollingConsumerPollStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation [...] "blobSequenceNumber": { "kind": "parameter", "displayName": "Blob Sequence Number", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "A user-controlled value that you can use to track requests. The value o [...] "blockListType": { "kind": "parameter", "displayName": "Block List Type", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.storage.blob.models.BlockListType", "enum": [ "committed", "uncommitted", "all" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "COMMITTED", "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": [...] + "changeFeedContext": { "kind": "parameter", "displayName": "Change Feed Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.core.util.Context", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When using getChangeFeed producer operation, this gives additional context that is [...] + "changeFeedEndTime": { "kind": "parameter", "displayName": "Change Feed End Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When using getChangeFeed producer operation, this filters the results to return even [...] + "changeFeedStartTime": { "kind": "parameter", "displayName": "Change Feed Start Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When using getChangeFeed producer operation, this filters the results to return [...] "closeStreamAfterWrite": { "kind": "parameter", "displayName": "Close Stream After Write", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "Close the stream after write or keep it open, default is true" }, "commitBlockListLater": { "kind": "parameter", "displayName": "Commit Block List Later", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When is set to true, the staged blocks will not be committed directly." }, "createAppendBlob": { "kind": "parameter", "displayName": "Create Append Blob", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.azure.storage.blob.BlobConfiguration", "configurationField": "configuration", "description": "When is set to true, the append blocks will be created when committing append blo [...] diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/docs/azure-storage-blob-component.adoc b/components/camel-azure/camel-azure-storage-blob/src/main/docs/azure-storage-blob-component.adoc index 1a6bf2c..11f7351 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/docs/azure-storage-blob-component.adoc +++ b/components/camel-azure/camel-azure-storage-blob/src/main/docs/azure-storage-blob-component.adoc @@ -65,7 +65,7 @@ to("file://blobdirectory"); // component options: START -The Azure Storage Blob Service component supports 27 options, which are listed below. +The Azure Storage Blob Service component supports 30 options, which are listed below. @@ -89,6 +89,9 @@ The Azure Storage Blob Service component supports 27 options, which are listed b | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *blobSequenceNumber* (producer) | A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1.The default value is 0. | 0 | Long | *blockListType* (producer) | Specifies which type of blocks to return. There are 3 enums and the value can be one of: committed, uncommitted, all | COMMITTED | BlockListType +| *changeFeedContext* (producer) | When using getChangeFeed producer operation, this gives additional context that is passed through the Http pipeline during the service call. | | Context +| *changeFeedEndTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. | | OffsetDateTime +| *changeFeedStartTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. | | OffsetDateTime | *closeStreamAfterWrite* (producer) | Close the stream after write or keep it open, default is true | true | boolean | *commitBlockListLater* (producer) | When is set to true, the staged blocks will not be committed directly. | true | boolean | *createAppendBlob* (producer) | When is set to true, the append blocks will be created when committing append blocks. | true | boolean @@ -122,7 +125,7 @@ with the following path and query parameters: |=== -=== Query Parameters (44 parameters): +=== Query Parameters (47 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -149,6 +152,9 @@ with the following path and query parameters: | *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. | | PollingConsumerPollStrategy | *blobSequenceNumber* (producer) | A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1.The default value is 0. | 0 | Long | *blockListType* (producer) | Specifies which type of blocks to return. There are 3 enums and the value can be one of: committed, uncommitted, all | COMMITTED | BlockListType +| *changeFeedContext* (producer) | When using getChangeFeed producer operation, this gives additional context that is passed through the Http pipeline during the service call. | | Context +| *changeFeedEndTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. | | OffsetDateTime +| *changeFeedStartTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. | | OffsetDateTime | *closeStreamAfterWrite* (producer) | Close the stream after write or keep it open, default is true | true | boolean | *commitBlockListLater* (producer) | When is set to true, the staged blocks will not be committed directly. | true | boolean | *createAppendBlob* (producer) | When is set to true, the append blocks will be created when committing append blocks. | true | boolean @@ -231,6 +237,9 @@ aggregate this number of messages. |`CamelAzureStorageBlobContainerName`|`BlobConstants.BLOB_CONTAINER_NAME`|`String`|Operations related to container and blob|Override/set the container name on the exchange headers. |`CamelAzureStorageBlobOperation`|`BlobConstants.BLOB_OPERATION`|`BlobOperationsDefinition`|All|Specify the producer operation to execute, please see the doc on this page related to producer operation. |`CamelAzureStorageBlobRegex`|`BlobConstants.REGEX`|`String`|`listBlobs`,`getBlob`|Filters the results to return only blobs whose names match the specified regular expression. May be null to return all. If both prefix and regex are set, regex takes the priority and prefix is ignored. +|`CamelAzureStorageBlobChangeFeedStartTime`|`BlobConstants.CHANGE_FEED_START_TIME`|`OffsetDateTime`|`getChangeFeed`| It filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. +|`CamelAzureStorageBlobChangeFeedEndTime`|`BlobConstants.CHANGE_FEED_END_TIME`|`OffsetDateTime`|`getChangeFeed`| It filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. +|`CamelAzureStorageBlobChangeFeedContext`|`BlobConstants.CHANGE_FEED_CONTEXT`|`Context`|`getChangeFeed`| This gives additional context that is passed through the Http pipeline during the service call. |======================================================================= === Message headers set by either component producer or consumer @@ -316,6 +325,7 @@ For these operations, `accountName` is *required*. |=== |Operation |Description |`listBlobContainers` |Get the content of the blob. You can restrict the output of this operation to a blob range. +|`getChangeFeed` | Returns transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. The change feed provides ordered, guaranteed, durable, immutable, read-only log of these changes. |=== *Operations on the container level* diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfiguration.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfiguration.java index 75da69f..b6a5bff 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfiguration.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfiguration.java @@ -17,7 +17,9 @@ package org.apache.camel.component.azure.storage.blob; import java.time.Duration; +import java.time.OffsetDateTime; +import com.azure.core.util.Context; import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; @@ -85,6 +87,12 @@ public class BlobConfiguration implements Cloneable { private Long pageBlobSize = BlobConstants.PAGE_BLOB_DEFAULT_SIZE; @UriParam(label = "producer", defaultValue = "COMMITTED") private BlockListType blockListType = BlockListType.COMMITTED; + @UriParam(label = "producer") + private OffsetDateTime changeFeedStartTime; + @UriParam(label = "producer") + private OffsetDateTime changeFeedEndTime; + @UriParam(label = "producer") + private Context changeFeedContext; @UriParam(label = "common") private String regex; @@ -366,6 +374,44 @@ public class BlobConfiguration implements Cloneable { } /** + * When using `getChangeFeed` producer operation, this filters the results to return events approximately after the + * start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to + * this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. + */ + public OffsetDateTime getChangeFeedStartTime() { + return changeFeedStartTime; + } + + public void setChangeFeedStartTime(OffsetDateTime changeFeedStartTime) { + this.changeFeedStartTime = changeFeedStartTime; + } + + /** + * When using `getChangeFeed` producer operation, this filters the results to return events approximately before the + * end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour + * can be missing; to ensure all events from the hour are returned, round the end time up by an hour. + */ + public OffsetDateTime getChangeFeedEndTime() { + return changeFeedEndTime; + } + + public void setChangeFeedEndTime(OffsetDateTime changeFeedEndTime) { + this.changeFeedEndTime = changeFeedEndTime; + } + + /** + * When using `getChangeFeed` producer operation, this gives additional context that is passed through the Http + * pipeline during the service call. + */ + public Context getChangeFeedContext() { + return changeFeedContext; + } + + public void setChangeFeedContext(Context changeFeedContext) { + this.changeFeedContext = changeFeedContext; + } + + /** * Filters the results to return only blobs whose names match the specified regular expression. May be null to * return all if both prefix and regex are set, regex takes the priority and prefix is ignored. */ diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfigurationOptionsProxy.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfigurationOptionsProxy.java index 76edf24..85df10d 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfigurationOptionsProxy.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConfigurationOptionsProxy.java @@ -17,10 +17,12 @@ package org.apache.camel.component.azure.storage.blob; import java.time.Duration; +import java.time.OffsetDateTime; import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; +import com.azure.core.util.Context; import com.azure.storage.blob.models.AccessTier; import com.azure.storage.blob.models.BlobHttpHeaders; import com.azure.storage.blob.models.BlobListDetails; @@ -203,6 +205,19 @@ public class BlobConfigurationOptionsProxy { return configuration.getMaxRetryRequests(); } + public OffsetDateTime getChangeFeedStartTime(final Exchange exchange) { + return getOption(BlobExchangeHeaders::getChangeFeedStartTimeFromHeaders, configuration::getChangeFeedStartTime, + exchange); + } + + public OffsetDateTime getChangeFeedEndTime(final Exchange exchange) { + return getOption(BlobExchangeHeaders::getChangeFeedEndTimeFromHeaders, configuration::getChangeFeedEndTime, exchange); + } + + public Context getChangeFeedContext(final Exchange exchange) { + return getOption(BlobExchangeHeaders::getChangeFeedContextFromHeaders, configuration::getChangeFeedContext, exchange); + } + public BlobConfiguration getConfiguration() { return configuration; } diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java index b2426c7..3f4bb2d 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java @@ -81,6 +81,10 @@ public final class BlobConstants { public static final String LIST_BLOB_CONTAINERS_OPTIONS = HEADER_PREFIX + "ListBlobContainersOptions"; public static final String PARALLEL_TRANSFER_OPTIONS = HEADER_PREFIX + "ParallelTransferOptions"; public static final String DOWNLOAD_LINK_EXPIRATION = HEADER_PREFIX + "DownloadLinkExpiration"; + // changefeed + public static final String CHANGE_FEED_START_TIME = HEADER_PREFIX + "ChangeFeedStartTime"; + public static final String CHANGE_FEED_END_TIME = HEADER_PREFIX + "ChangeFeedEndTime"; + public static final String CHANGE_FEED_CONTEXT = HEADER_PREFIX + "Context"; private BlobConstants() { } diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java index 35b73c4..5876920 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import com.azure.core.http.HttpHeaders; +import com.azure.core.util.Context; import com.azure.storage.blob.models.AccessTier; import com.azure.storage.blob.models.AppendBlobItem; import com.azure.storage.blob.models.ArchiveStatus; @@ -248,6 +249,18 @@ public class BlobExchangeHeaders { return getObjectFromHeaders(exchange, BlobConstants.BLOB_OPERATION, BlobOperationsDefinition.class); } + public static OffsetDateTime getChangeFeedStartTimeFromHeaders(final Exchange exchange) { + return getObjectFromHeaders(exchange, BlobConstants.CHANGE_FEED_START_TIME, OffsetDateTime.class); + } + + public static OffsetDateTime getChangeFeedEndTimeFromHeaders(final Exchange exchange) { + return getObjectFromHeaders(exchange, BlobConstants.CHANGE_FEED_END_TIME, OffsetDateTime.class); + } + + public static Context getChangeFeedContextFromHeaders(final Exchange exchange) { + return getObjectFromHeaders(exchange, BlobConstants.CHANGE_FEED_CONTEXT, Context.class); + } + private static <T> T getObjectFromHeaders(final Exchange exchange, final String headerName, final Class<T> classType) { return ObjectHelper.isEmpty(exchange) ? null : exchange.getIn().getHeader(headerName, classType); } diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobOperationsDefinition.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobOperationsDefinition.java index f94d07d..6b3ad4c 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobOperationsDefinition.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobOperationsDefinition.java @@ -119,4 +119,9 @@ public enum BlobOperationsDefinition { * Returns the list of valid page ranges for a page blob or snapshot of a page blob. */ getPageBlobRanges, + /** + * Returns transaction logs of all the changes that occur to the blobs and the blob metadata in your storage + * account. The change feed provides ordered, guaranteed, durable, immutable, read-only log of these changes. + */ + getChangeFeed } diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobProducer.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobProducer.java index 4e7c6de..fd71cb9 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobProducer.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobProducer.java @@ -16,10 +16,13 @@ */ package org.apache.camel.component.azure.storage.blob; +import com.azure.storage.blob.changefeed.BlobChangefeedClient; +import com.azure.storage.blob.changefeed.BlobChangefeedClientBuilder; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.component.azure.storage.blob.client.BlobClientWrapper; import org.apache.camel.component.azure.storage.blob.client.BlobServiceClientWrapper; +import org.apache.camel.component.azure.storage.blob.operations.BlobChangeFeedOperations; import org.apache.camel.component.azure.storage.blob.operations.BlobContainerOperations; import org.apache.camel.component.azure.storage.blob.operations.BlobOperationResponse; import org.apache.camel.component.azure.storage.blob.operations.BlobOperations; @@ -117,6 +120,9 @@ public class BlobProducer extends DefaultProducer { case getPageBlobRanges: setResponse(exchange, getBlobOperations(exchange).getPageBlobRanges(exchange)); break; + case getChangeFeed: + setResponse(exchange, getBlobChangeFeedOperations().getEvents(exchange)); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -149,6 +155,13 @@ public class BlobProducer extends DefaultProducer { return new BlobOperations(configuration, clientWrapper); } + private BlobChangeFeedOperations getBlobChangeFeedOperations() { + final BlobChangefeedClient changefeedClient + = new BlobChangefeedClientBuilder(getEndpoint().getBlobServiceClient()).buildClient(); + + return new BlobChangeFeedOperations(changefeedClient, configurationProxy); + } + private String determineContainerName(final Exchange exchange) { final String containerName = configurationProxy.getContainerName(exchange); diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/operations/BlobChangeFeedOperations.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/operations/BlobChangeFeedOperations.java new file mode 100644 index 0000000..5e907ff --- /dev/null +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/operations/BlobChangeFeedOperations.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.blob.operations; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.stream.Collectors; + +import com.azure.core.util.Context; +import com.azure.storage.blob.changefeed.BlobChangefeedClient; +import com.azure.storage.blob.changefeed.models.BlobChangefeedEvent; +import org.apache.camel.Exchange; +import org.apache.camel.component.azure.storage.blob.BlobConfigurationOptionsProxy; +import org.apache.camel.util.ObjectHelper; + +public class BlobChangeFeedOperations { + + private final BlobChangefeedClient client; + private final BlobConfigurationOptionsProxy configurationOptionsProxy; + + public BlobChangeFeedOperations(BlobChangefeedClient client, BlobConfigurationOptionsProxy configurationOptionsProxy) { + ObjectHelper.notNull(client, "client cannot be null"); + + this.client = client; + this.configurationOptionsProxy = configurationOptionsProxy; + } + + public BlobOperationResponse getEvents(final Exchange exchange) { + final OffsetDateTime startTime = configurationOptionsProxy.getChangeFeedStartTime(exchange); + final OffsetDateTime endTime = configurationOptionsProxy.getChangeFeedEndTime(exchange); + final Context context = configurationOptionsProxy.getChangeFeedContext(exchange); + final BlobOperationResponse response = new BlobOperationResponse(); + + if (ObjectHelper.isEmpty(startTime) || ObjectHelper.isEmpty(endTime)) { + response.setBody(getEvents()); + } else { + response.setBody(getEvents(startTime, endTime, context)); + } + + return response; + } + + private List<BlobChangefeedEvent> getEvents() { + return client.getEvents().stream().collect(Collectors.toList()); + } + + private List<BlobChangefeedEvent> getEvents( + final OffsetDateTime startTime, final OffsetDateTime endTime, final Context context) { + + return client.getEvents(startTime, endTime, context).stream().collect(Collectors.toList()); + } + +} diff --git a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobComponentTest.java b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobComponentTest.java index 0fc2066..48c3f4c 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobComponentTest.java +++ b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobComponentTest.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.azure.storage.blob; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Collections; import com.azure.storage.blob.BlobServiceClient; @@ -127,6 +131,31 @@ class BlobComponentTest extends CamelTestSupport { assertEquals("blob/sub", endpoint.getConfiguration().getBlobName()); } + @Test + void testCreateEndpointWithChangeFeedConfig() { + context.getRegistry().bind("creds", storageSharedKeyCredential()); + context.getRegistry().bind("metadata", Collections.emptyMap()); + context.getRegistry().bind("starttime", + OffsetDateTime.of(LocalDate.of(2021, 8, 4), LocalTime.of(11, 5), ZoneOffset.ofHours(0))); + context.getRegistry().bind("endtime", + OffsetDateTime.of(LocalDate.of(2021, 12, 4), LocalTime.of(11, 5), ZoneOffset.ofHours(0))); + + final String uri = "azure-storage-blob://camelazure" + + "?credentials=#creds" + + "&operation=getChangeFeed" + + "&changeFeedStartTime=#starttime" + + "&changeFeedEndTime=#endtime"; + final BlobEndpoint endpoint = (BlobEndpoint) context.getEndpoint(uri); + + assertEquals("camelazure", endpoint.getConfiguration().getAccountName()); + assertNull(endpoint.getConfiguration().getServiceClient()); + assertNotNull(endpoint.getConfiguration().getCredentials()); + + assertEquals(BlobOperationsDefinition.getChangeFeed, endpoint.getConfiguration().getOperation()); + assertEquals(OffsetDateTime.parse("2021-08-04T11:05Z"), endpoint.getConfiguration().getChangeFeedStartTime()); + assertEquals(OffsetDateTime.parse("2021-12-04T11:05Z"), endpoint.getConfiguration().getChangeFeedEndTime()); + } + private StorageSharedKeyCredential storageSharedKeyCredential() { return new StorageSharedKeyCredential("fakeuser", "fakekey"); } diff --git a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobChangeFeedOperationsIT.java b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobChangeFeedOperationsIT.java new file mode 100644 index 0000000..fa9bd42 --- /dev/null +++ b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobChangeFeedOperationsIT.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.blob.integration; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.changefeed.BlobChangefeedClient; +import com.azure.storage.blob.changefeed.BlobChangefeedClientBuilder; +import org.apache.camel.EndpointInject; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.azure.storage.blob.BlobConstants; +import org.apache.camel.component.azure.storage.blob.BlobUtils; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class BlobChangeFeedOperationsIT extends Base { + + private BlobChangefeedClient client; + private BlobContainerClient containerClient; + + @EndpointInject + private ProducerTemplate template; + + @EndpointInject("mock:result") + private MockEndpoint result; + private String resultName = "mock:result"; + + @BeforeAll + public void setupClient() { + client = new BlobChangefeedClientBuilder(serviceClient) + .buildClient(); + + // create test container + containerClient = serviceClient.getBlobContainerClient(containerName); + containerClient.create(); + } + + @Test + @Disabled("It is disabled due to changefeed support in the default test account") + void testGetChangeFeed() throws IOException, InterruptedException { + // create test blobs + final String blobName = RandomStringUtils.randomAlphabetic(10); + final String data = "Hello world from my awesome tests!"; + final InputStream dataStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + + containerClient.getBlobClient(blobName).getBlockBlobClient().upload(dataStream, + BlobUtils.getInputStreamLength(dataStream)); + + result.expectedMessageCount(1); + + // test feed + template.send("direct:getChangeFeed", exchange -> { + exchange.getIn().setHeader(BlobConstants.BLOB_NAME, blobName); + exchange.getIn().setBody("test"); + }); + + result.assertIsSatisfied(1000); + + // we have events + assertNotNull(result.getExchanges().get(0).getMessage().getBody()); + } + + @AfterAll + public void tearDown() { + containerClient.delete(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:getChangeFeed") + .to("azure-storage-blob://cameldev?operation=getChangeFeed") + .to(resultName); + } + }; + } +} diff --git a/components/camel-azure/camel-azure-storage-blob/src/test/resources/log4j2.properties b/components/camel-azure/camel-azure-storage-blob/src/test/resources/log4j2.properties index ae1c0f8f..e86cb5e 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/test/resources/log4j2.properties +++ b/components/camel-azure/camel-azure-storage-blob/src/test/resources/log4j2.properties @@ -24,4 +24,4 @@ appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = INFO -rootLogger.appenderRef.file.ref = file \ No newline at end of file +rootLogger.appenderRef.file.ref = file diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureStorageBlobComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureStorageBlobComponentBuilderFactory.java index a6a0d87..2368e3e 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureStorageBlobComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureStorageBlobComponentBuilderFactory.java @@ -348,6 +348,64 @@ public interface AzureStorageBlobComponentBuilderFactory { return this; } /** + * When using getChangeFeed producer operation, this gives additional + * context that is passed through the Http pipeline during the service + * call. + * + * The option is a: <code>com.azure.core.util.Context</code> + * type. + * + * Group: producer + * + * @param changeFeedContext the value to set + * @return the dsl builder + */ + default AzureStorageBlobComponentBuilder changeFeedContext( + com.azure.core.util.Context changeFeedContext) { + doSetProperty("changeFeedContext", changeFeedContext); + return this; + } + /** + * When using getChangeFeed producer operation, this filters the results + * to return events approximately before the end time. Note: A few + * events belonging to the next hour can also be returned. A few events + * belonging to this hour can be missing; to ensure all events from the + * hour are returned, round the end time up by an hour. + * + * The option is a: <code>java.time.OffsetDateTime</code> + * type. + * + * Group: producer + * + * @param changeFeedEndTime the value to set + * @return the dsl builder + */ + default AzureStorageBlobComponentBuilder changeFeedEndTime( + java.time.OffsetDateTime changeFeedEndTime) { + doSetProperty("changeFeedEndTime", changeFeedEndTime); + return this; + } + /** + * When using getChangeFeed producer operation, this filters the results + * to return events approximately after the start time. Note: A few + * events belonging to the previous hour can also be returned. A few + * events belonging to this hour can be missing; to ensure all events + * from the hour are returned, round the start time down by an hour. + * + * The option is a: <code>java.time.OffsetDateTime</code> + * type. + * + * Group: producer + * + * @param changeFeedStartTime the value to set + * @return the dsl builder + */ + default AzureStorageBlobComponentBuilder changeFeedStartTime( + java.time.OffsetDateTime changeFeedStartTime) { + doSetProperty("changeFeedStartTime", changeFeedStartTime); + return this; + } + /** * Close the stream after write or keep it open, default is true. * * The option is a: <code>boolean</code> type. @@ -566,6 +624,9 @@ public interface AzureStorageBlobComponentBuilderFactory { case "bridgeErrorHandler": ((BlobComponent) component).setBridgeErrorHandler((boolean) value); return true; case "blobSequenceNumber": getOrCreateConfiguration((BlobComponent) component).setBlobSequenceNumber((java.lang.Long) value); return true; case "blockListType": getOrCreateConfiguration((BlobComponent) component).setBlockListType((com.azure.storage.blob.models.BlockListType) value); return true; + case "changeFeedContext": getOrCreateConfiguration((BlobComponent) component).setChangeFeedContext((com.azure.core.util.Context) value); return true; + case "changeFeedEndTime": getOrCreateConfiguration((BlobComponent) component).setChangeFeedEndTime((java.time.OffsetDateTime) value); return true; + case "changeFeedStartTime": getOrCreateConfiguration((BlobComponent) component).setChangeFeedStartTime((java.time.OffsetDateTime) value); return true; case "closeStreamAfterWrite": getOrCreateConfiguration((BlobComponent) component).setCloseStreamAfterWrite((boolean) value); return true; case "commitBlockListLater": getOrCreateConfiguration((BlobComponent) component).setCommitBlockListLater((boolean) value); return true; case "createAppendBlob": getOrCreateConfiguration((BlobComponent) component).setCreateAppendBlob((boolean) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BlobEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BlobEndpointBuilderFactory.java index 86d703f..c3c78e1 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BlobEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BlobEndpointBuilderFactory.java @@ -17,6 +17,7 @@ package org.apache.camel.builder.endpoint.dsl; import java.time.Duration; +import java.time.OffsetDateTime; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -1670,6 +1671,122 @@ public interface BlobEndpointBuilderFactory { return this; } /** + * When using getChangeFeed producer operation, this gives additional + * context that is passed through the Http pipeline during the service + * call. + * + * The option is a: <code>com.azure.core.util.Context</code> + * type. + * + * Group: producer + * + * @param changeFeedContext the value to set + * @return the dsl builder + */ + default BlobEndpointProducerBuilder changeFeedContext( + Object changeFeedContext) { + doSetProperty("changeFeedContext", changeFeedContext); + return this; + } + /** + * When using getChangeFeed producer operation, this gives additional + * context that is passed through the Http pipeline during the service + * call. + * + * The option will be converted to a + * <code>com.azure.core.util.Context</code> type. + * + * Group: producer + * + * @param changeFeedContext the value to set + * @return the dsl builder + */ + default BlobEndpointProducerBuilder changeFeedContext( + String changeFeedContext) { + doSetProperty("changeFeedContext", changeFeedContext); + return this; + } + /** + * When using getChangeFeed producer operation, this filters the results + * to return events approximately before the end time. Note: A few + * events belonging to the next hour can also be returned. A few events + * belonging to this hour can be missing; to ensure all events from the + * hour are returned, round the end time up by an hour. + * + * The option is a: <code>java.time.OffsetDateTime</code> + * type. + * + * Group: producer + * + * @param changeFeedEndTime the value to set + * @return the dsl builder + */ + default BlobEndpointProducerBuilder changeFeedEndTime( + OffsetDateTime changeFeedEndTime) { + doSetProperty("changeFeedEndTime", changeFeedEndTime); + return this; + } + /** + * When using getChangeFeed producer operation, this filters the results + * to return events approximately before the end time. Note: A few + * events belonging to the next hour can also be returned. A few events + * belonging to this hour can be missing; to ensure all events from the + * hour are returned, round the end time up by an hour. + * + * The option will be converted to a + * <code>java.time.OffsetDateTime</code> type. + * + * Group: producer + * + * @param changeFeedEndTime the value to set + * @return the dsl builder + */ + default BlobEndpointProducerBuilder changeFeedEndTime( + String changeFeedEndTime) { + doSetProperty("changeFeedEndTime", changeFeedEndTime); + return this; + } + /** + * When using getChangeFeed producer operation, this filters the results + * to return events approximately after the start time. Note: A few + * events belonging to the previous hour can also be returned. A few + * events belonging to this hour can be missing; to ensure all events + * from the hour are returned, round the start time down by an hour. + * + * The option is a: <code>java.time.OffsetDateTime</code> + * type. + * + * Group: producer + * + * @param changeFeedStartTime the value to set + * @return the dsl builder + */ + default BlobEndpointProducerBuilder changeFeedStartTime( + OffsetDateTime changeFeedStartTime) { + doSetProperty("changeFeedStartTime", changeFeedStartTime); + return this; + } + /** + * When using getChangeFeed producer operation, this filters the results + * to return events approximately after the start time. Note: A few + * events belonging to the previous hour can also be returned. A few + * events belonging to this hour can be missing; to ensure all events + * from the hour are returned, round the start time down by an hour. + * + * The option will be converted to a + * <code>java.time.OffsetDateTime</code> type. + * + * Group: producer + * + * @param changeFeedStartTime the value to set + * @return the dsl builder + */ + default BlobEndpointProducerBuilder changeFeedStartTime( + String changeFeedStartTime) { + doSetProperty("changeFeedStartTime", changeFeedStartTime); + return this; + } + /** * Close the stream after write or keep it open, default is true. * * The option is a: <code>boolean</code> type. @@ -2484,7 +2601,8 @@ public interface BlobEndpointBuilderFactory { uploadPageBlob, resizePageBlob, clearPageBlob, - getPageBlobRanges; + getPageBlobRanges, + getChangeFeed; } public interface BlobBuilders { diff --git a/docs/components/modules/ROOT/pages/azure-storage-blob-component.adoc b/docs/components/modules/ROOT/pages/azure-storage-blob-component.adoc index 0a421b5..361bcb7 100644 --- a/docs/components/modules/ROOT/pages/azure-storage-blob-component.adoc +++ b/docs/components/modules/ROOT/pages/azure-storage-blob-component.adoc @@ -67,7 +67,7 @@ to("file://blobdirectory"); // component options: START -The Azure Storage Blob Service component supports 27 options, which are listed below. +The Azure Storage Blob Service component supports 30 options, which are listed below. @@ -91,6 +91,9 @@ The Azure Storage Blob Service component supports 27 options, which are listed b | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *blobSequenceNumber* (producer) | A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1.The default value is 0. | 0 | Long | *blockListType* (producer) | Specifies which type of blocks to return. There are 3 enums and the value can be one of: committed, uncommitted, all | COMMITTED | BlockListType +| *changeFeedContext* (producer) | When using getChangeFeed producer operation, this gives additional context that is passed through the Http pipeline during the service call. | | Context +| *changeFeedEndTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. | | OffsetDateTime +| *changeFeedStartTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. | | OffsetDateTime | *closeStreamAfterWrite* (producer) | Close the stream after write or keep it open, default is true | true | boolean | *commitBlockListLater* (producer) | When is set to true, the staged blocks will not be committed directly. | true | boolean | *createAppendBlob* (producer) | When is set to true, the append blocks will be created when committing append blocks. | true | boolean @@ -124,7 +127,7 @@ with the following path and query parameters: |=== -=== Query Parameters (44 parameters): +=== Query Parameters (47 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -151,6 +154,9 @@ with the following path and query parameters: | *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. | | PollingConsumerPollStrategy | *blobSequenceNumber* (producer) | A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1.The default value is 0. | 0 | Long | *blockListType* (producer) | Specifies which type of blocks to return. There are 3 enums and the value can be one of: committed, uncommitted, all | COMMITTED | BlockListType +| *changeFeedContext* (producer) | When using getChangeFeed producer operation, this gives additional context that is passed through the Http pipeline during the service call. | | Context +| *changeFeedEndTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. | | OffsetDateTime +| *changeFeedStartTime* (producer) | When using getChangeFeed producer operation, this filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. | | OffsetDateTime | *closeStreamAfterWrite* (producer) | Close the stream after write or keep it open, default is true | true | boolean | *commitBlockListLater* (producer) | When is set to true, the staged blocks will not be committed directly. | true | boolean | *createAppendBlob* (producer) | When is set to true, the append blocks will be created when committing append blocks. | true | boolean @@ -233,6 +239,9 @@ aggregate this number of messages. |`CamelAzureStorageBlobContainerName`|`BlobConstants.BLOB_CONTAINER_NAME`|`String`|Operations related to container and blob|Override/set the container name on the exchange headers. |`CamelAzureStorageBlobOperation`|`BlobConstants.BLOB_OPERATION`|`BlobOperationsDefinition`|All|Specify the producer operation to execute, please see the doc on this page related to producer operation. |`CamelAzureStorageBlobRegex`|`BlobConstants.REGEX`|`String`|`listBlobs`,`getBlob`|Filters the results to return only blobs whose names match the specified regular expression. May be null to return all. If both prefix and regex are set, regex takes the priority and prefix is ignored. +|`CamelAzureStorageBlobChangeFeedStartTime`|`BlobConstants.CHANGE_FEED_START_TIME`|`OffsetDateTime`|`getChangeFeed`| It filters the results to return events approximately after the start time. Note: A few events belonging to the previous hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the start time down by an hour. +|`CamelAzureStorageBlobChangeFeedEndTime`|`BlobConstants.CHANGE_FEED_END_TIME`|`OffsetDateTime`|`getChangeFeed`| It filters the results to return events approximately before the end time. Note: A few events belonging to the next hour can also be returned. A few events belonging to this hour can be missing; to ensure all events from the hour are returned, round the end time up by an hour. +|`CamelAzureStorageBlobChangeFeedContext`|`BlobConstants.CHANGE_FEED_CONTEXT`|`Context`|`getChangeFeed`| This gives additional context that is passed through the Http pipeline during the service call. |======================================================================= === Message headers set by either component producer or consumer @@ -318,6 +327,7 @@ For these operations, `accountName` is *required*. |=== |Operation |Description |`listBlobContainers` |Get the content of the blob. You can restrict the output of this operation to a blob range. +|`getChangeFeed` | Returns transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. The change feed provides ordered, guaranteed, durable, immutable, read-only log of these changes. |=== *Operations on the container level* diff --git a/parent/pom.xml b/parent/pom.xml index 91aa534..8cfda1c 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -85,6 +85,7 @@ <axiom-version>1.2.14</axiom-version> <azure-sdk-bom-version>1.0.3</azure-sdk-bom-version> <azure-jackson2-version>2.11.3</azure-jackson2-version> + <azure-storage-blob-changedfeed-version>12.0.0-beta.8</azure-storage-blob-changedfeed-version> <beanio-version>2.1.0</beanio-version> <beanstalkd-client-version>1.4.6</beanstalkd-client-version> <bouncycastle-version>1.69</bouncycastle-version>