This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7ee8b4d7bdb6ed046737fff946022a4d628fd6ba Author: Vilmos Nagy <vilmos.n...@outlook.com> AuthorDate: Thu Jan 28 13:13:35 2021 +0100 [CAMEL-14871] doneFileName attribute on the S3Consumers --- .../component/aws/s3/S3EndpointConfigurer.java | 6 ++++++ .../org/apache/camel/component/aws/s3/aws-s3.json | 1 + .../camel/component/aws/s3/S3Configuration.java | 13 +++++++++++ .../apache/camel/component/aws/s3/S3Consumer.java | 22 ++++++++++++++++++- .../aws2/s3/AWS2S3EndpointConfigurer.java | 6 ++++++ .../apache/camel/component/aws2/s3/aws2-s3.json | 1 + .../component/aws2/s3/AWS2S3Configuration.java | 13 +++++++++++ .../camel/component/aws2/s3/AWS2S3Consumer.java | 25 ++++++++++++++++++++-- 8 files changed, 84 insertions(+), 3 deletions(-) diff --git a/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java b/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java index 6c77dfe..fb8d660 100644 --- a/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java +++ b/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java @@ -51,6 +51,8 @@ public class S3EndpointConfigurer extends PropertyConfigurerSupport implements G case "deleteafterwrite": case "deleteAfterWrite": target.getConfiguration().setDeleteAfterWrite(property(camelContext, boolean.class, value)); return true; case "delimiter": target.getConfiguration().setDelimiter(property(camelContext, java.lang.String.class, value)); return true; + case "donefilename": + case "doneFileName": target.getConfiguration().setDoneFileName(property(camelContext, java.lang.String.class, value)); return true; case "dualstackenabled": case "dualstackEnabled": target.getConfiguration().setDualstackEnabled(property(camelContext, boolean.class, value)); return true; case "encryptionmaterials": @@ -164,6 +166,8 @@ public class S3EndpointConfigurer extends PropertyConfigurerSupport implements G case "deleteafterwrite": case "deleteAfterWrite": return boolean.class; case "delimiter": return java.lang.String.class; + case "donefilename": + case "doneFileName": return java.lang.String.class; case "dualstackenabled": case "dualstackEnabled": return boolean.class; case "encryptionmaterials": @@ -278,6 +282,8 @@ public class S3EndpointConfigurer extends PropertyConfigurerSupport implements G case "deleteafterwrite": case "deleteAfterWrite": return target.getConfiguration().isDeleteAfterWrite(); case "delimiter": return target.getConfiguration().getDelimiter(); + case "donefilename": + case "doneFileName": return target.getConfiguration().getDoneFileName(); case "dualstackenabled": case "dualstackEnabled": return target.getConfiguration().isDualstackEnabled(); case "encryptionmaterials": diff --git a/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json b/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json index d541d1e..345b0ad 100644 --- a/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json +++ b/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json @@ -39,6 +39,7 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "deleteAfterRead": { "kind": "property", "displayName": "Delete After Read", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws.s3.S3Configuration", "configurationField": "configuration", "description": "Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchang [...] "delimiter": { "kind": "property", "displayName": "Delimiter", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws.s3.S3Configuration", "configurationField": "configuration", "description": "The delimiter which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are intereste [...] + "doneFileName": { "kind": "parameter", "displayName": "Done File Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Consumer: If provided, Camel will only consume files if a done file exists. This option configures what file name to use. Either you can specify a fixed name. Or you can use dynamic placeholders. The done file is always expected in the same folder as [...] "fileName": { "kind": "property", "displayName": "File Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws.s3.S3Configuration", "configurationField": "configuration", "description": "To get the object from the bucket with the given file name" }, "includeBody": { "kind": "property", "displayName": "Include Body", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws.s3.S3Configuration", "configurationField": "configuration", "description": "If it is true, the exchange body will be set to a stream to the contents of the file. If false, the headers [...] "prefix": { "kind": "property", "displayName": "Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws.s3.S3Configuration", "configurationField": "configuration", "description": "The prefix which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in." }, diff --git a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java index f76302d..1f1329f 100644 --- a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java +++ b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java @@ -43,6 +43,8 @@ public class S3Configuration implements Cloneable { private String prefix; @UriParam(label = "consumer") private String delimiter; + @UriParam(label = "consumer") + private String doneFileName; @UriParam private String region; @UriParam(label = "consumer", defaultValue = "true") @@ -192,6 +194,17 @@ public class S3Configuration implements Cloneable { this.delimiter = delimiter; } + public String getDoneFileName() { + return doneFileName; + } + + /** + * If provided, Camel will only consume files if a done file exists. + */ + public void setDoneFileName(String doneFileName) { + this.doneFileName = doneFileName; + } + public String getBucketName() { return bucketName; } diff --git a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java index fd74577..41a2f15 100644 --- a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java +++ b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java @@ -24,6 +24,7 @@ import java.util.Queue; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; @@ -66,9 +67,12 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { String fileName = getConfiguration().getFileName(); String bucketName = getConfiguration().getBucketName(); + String doneFileName = getConfiguration().getDoneFileName(); Queue<Exchange> exchanges; - if (fileName != null) { + if (shouldSkipCauseDoneFileIsConfiguredButMissing(bucketName, doneFileName)) { + exchanges = new LinkedList<>(); + } else if (fileName != null) { LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); @@ -108,6 +112,22 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { return processBatch(CastUtils.cast(exchanges)); } + private boolean shouldSkipCauseDoneFileIsConfiguredButMissing(String bucketName, String doneFileName) { + if (doneFileName == null) { + return false; + } else { + try { + getAmazonS3Client().getObjectMetadata(bucketName, doneFileName); + return false; + } catch(AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + return true; + } + throw e; + } + } + } + protected Queue<Exchange> createExchanges(S3Object s3Object) { Queue<Exchange> answer = new LinkedList<>(); Exchange exchange = getEndpoint().createExchange(s3Object); diff --git a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java index fe10e28..eff2b24 100644 --- a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java +++ b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java @@ -53,6 +53,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "deleteafterwrite": case "deleteAfterWrite": target.getConfiguration().setDeleteAfterWrite(property(camelContext, boolean.class, value)); return true; case "delimiter": target.getConfiguration().setDelimiter(property(camelContext, java.lang.String.class, value)); return true; + case "donefilename": + case "doneFileName": target.getConfiguration().setDoneFileName(property(camelContext, java.lang.String.class, value)); return true; case "destinationbucket": case "destinationBucket": target.getConfiguration().setDestinationBucket(property(camelContext, java.lang.String.class, value)); return true; case "destinationbucketprefix": @@ -177,6 +179,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "deleteafterwrite": case "deleteAfterWrite": return boolean.class; case "delimiter": return java.lang.String.class; + case "donefilename": + case "doneFileName": return java.lang.String.class; case "destinationbucket": case "destinationBucket": return java.lang.String.class; case "destinationbucketprefix": @@ -297,6 +301,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "deleteafterwrite": case "deleteAfterWrite": return target.getConfiguration().isDeleteAfterWrite(); case "delimiter": return target.getConfiguration().getDelimiter(); + case "donefilename": + case "doneFileName": return target.getConfiguration().getDoneFileName(); case "destinationbucket": case "destinationBucket": return target.getConfiguration().getDestinationBucket(); case "destinationbucketprefix": diff --git a/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json b/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json index bb5a4f4..1464f9a 100644 --- a/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json +++ b/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json @@ -42,6 +42,7 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "deleteAfterRead": { "kind": "property", "displayName": "Delete After Read", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Delete objects from S3 after they have been retrieved. The delete is only performed if the Ex [...] "delimiter": { "kind": "property", "displayName": "Delimiter", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The delimiter which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are inte [...] + "doneFileName": { "kind": "parameter", "displayName": "Done File Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Consumer: If provided, Camel will only consume files if a done file exists. This option configures what file name to use. Either you can specify a fixed name. Or you can use dynamic placeholders. The done file is always expected in the same folder as [...] "destinationBucket": { "kind": "property", "displayName": "Destination Bucket", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Define the destination bucket where an object must be moved when moveAfterRead is set to true." }, "destinationBucketPrefix": { "kind": "property", "displayName": "Destination Bucket Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Define the destination bucket prefix to use when an object must be moved and moveAfterRead [...] "destinationBucketSuffix": { "kind": "property", "displayName": "Destination Bucket Suffix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Define the destination bucket suffix to use when an object must be moved and moveAfterRead [...] diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java index 9e70d51..742362a 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java @@ -44,6 +44,8 @@ public class AWS2S3Configuration implements Cloneable { private String prefix; @UriParam(label = "consumer") private String delimiter; + @UriParam(label = "consumer") + private String doneFileName; @UriParam(label = "consumer", defaultValue = "true") private boolean includeFolders = true; @UriParam @@ -189,6 +191,17 @@ public class AWS2S3Configuration implements Cloneable { this.delimiter = delimiter; } + public String getDoneFileName() { + return doneFileName; + } + + /** + * If provided, Camel will only consume files if a done file exists. + */ + public void setDoneFileName(String doneFileName) { + this.doneFileName = doneFileName; + } + /** * If it is true, the folders/directories will be consumed. If it is false, they will be ignored, and Exchanges will * not be created for those diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index 405f941..f59a81d 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -47,8 +47,10 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest.Builder; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.S3Object; /** @@ -109,9 +111,12 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { String fileName = getConfiguration().getFileName(); String bucketName = getConfiguration().getBucketName(); + String doneFileName = getConfiguration().getDoneFileName(); Queue<Exchange> exchanges; - if (fileName != null) { + if (shouldSkipCauseDoneFileIsConfiguredButMissing(bucketName, doneFileName)) { + exchanges = new LinkedList<>(); + } else if (fileName != null) { LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); ResponseInputStream<GetObjectResponse> s3Object @@ -153,6 +158,22 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { return processBatch(CastUtils.cast(exchanges)); } + private boolean shouldSkipCauseDoneFileIsConfiguredButMissing(String bucketName, String doneFileName) { + if (doneFileName == null) { + return false; + } else { + HeadObjectRequest.Builder headObjectsRequest = HeadObjectRequest.builder(); + headObjectsRequest.bucket(bucketName); + headObjectsRequest.key(doneFileName); + try { + getAmazonS3Client().headObject(headObjectsRequest.build()); + return false; + } catch(NoSuchKeyException e) { + return true; + } + } + } + protected Queue<Exchange> createExchanges(ResponseInputStream<GetObjectResponse> s3Object, String key) { Queue<Exchange> answer = new LinkedList<>(); Exchange exchange = getEndpoint().createExchange(s3Object, key); @@ -190,7 +211,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { Exchange exchange = getEndpoint().createExchange(s3Object, s3ObjectSummary.key()); answer.add(exchange); } else { - // If includeFolders != true and the object is not included, it is safe to close the object here. + // If includeFolders != true and the object is not included, it is safe to close the object here. // If includeFolders == true, the exchange will close the object. IOHelper.close(s3Object); }