This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7ee8b4d7bdb6ed046737fff946022a4d628fd6ba
Author: Vilmos Nagy <vilmos.n...@outlook.com>
AuthorDate: Thu Jan 28 13:13:35 2021 +0100

    [CAMEL-14871] doneFileName attribute on the S3Consumers
---
 .../component/aws/s3/S3EndpointConfigurer.java     |  6 ++++++
 .../org/apache/camel/component/aws/s3/aws-s3.json  |  1 +
 .../camel/component/aws/s3/S3Configuration.java    | 13 +++++++++++
 .../apache/camel/component/aws/s3/S3Consumer.java  | 22 ++++++++++++++++++-
 .../aws2/s3/AWS2S3EndpointConfigurer.java          |  6 ++++++
 .../apache/camel/component/aws2/s3/aws2-s3.json    |  1 +
 .../component/aws2/s3/AWS2S3Configuration.java     | 13 +++++++++++
 .../camel/component/aws2/s3/AWS2S3Consumer.java    | 25 ++++++++++++++++++++--
 8 files changed, 84 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java
 
b/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java
index 6c77dfe..fb8d660 100644
--- 
a/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java
+++ 
b/components/camel-aws-s3/src/generated/java/org/apache/camel/component/aws/s3/S3EndpointConfigurer.java
@@ -51,6 +51,8 @@ public class S3EndpointConfigurer extends 
PropertyConfigurerSupport implements G
         case "deleteafterwrite":
         case "deleteAfterWrite": 
target.getConfiguration().setDeleteAfterWrite(property(camelContext, 
boolean.class, value)); return true;
         case "delimiter": 
target.getConfiguration().setDelimiter(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "donefilename":
+        case "doneFileName": 
target.getConfiguration().setDoneFileName(property(camelContext, 
java.lang.String.class, value)); return true;
         case "dualstackenabled":
         case "dualstackEnabled": 
target.getConfiguration().setDualstackEnabled(property(camelContext, 
boolean.class, value)); return true;
         case "encryptionmaterials":
@@ -164,6 +166,8 @@ public class S3EndpointConfigurer extends 
PropertyConfigurerSupport implements G
         case "deleteafterwrite":
         case "deleteAfterWrite": return boolean.class;
         case "delimiter": return java.lang.String.class;
+        case "donefilename":
+        case "doneFileName": return java.lang.String.class;
         case "dualstackenabled":
         case "dualstackEnabled": return boolean.class;
         case "encryptionmaterials":
@@ -278,6 +282,8 @@ public class S3EndpointConfigurer extends 
PropertyConfigurerSupport implements G
         case "deleteafterwrite":
         case "deleteAfterWrite": return 
target.getConfiguration().isDeleteAfterWrite();
         case "delimiter": return target.getConfiguration().getDelimiter();
+        case "donefilename":
+        case "doneFileName": return 
target.getConfiguration().getDoneFileName();
         case "dualstackenabled":
         case "dualstackEnabled": return 
target.getConfiguration().isDualstackEnabled();
         case "encryptionmaterials":
diff --git 
a/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json
 
b/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json
index d541d1e..345b0ad 100644
--- 
a/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json
+++ 
b/components/camel-aws-s3/src/generated/resources/org/apache/camel/component/aws/s3/aws-s3.json
@@ -39,6 +39,7 @@
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "deleteAfterRead": { "kind": "property", "displayName": "Delete After 
Read", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.aws.s3.S3Configuration", "configurationField": 
"configuration", "description": "Delete objects from S3 after they have been 
retrieved. The delete is only performed if the Exchang [...]
     "delimiter": { "kind": "property", "displayName": "Delimiter", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws.s3.S3Configuration", "configurationField": 
"configuration", "description": "The delimiter which is used in the 
com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we 
are intereste [...]
+    "doneFileName": { "kind": "parameter", "displayName": "Done File Name", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"description": "Consumer: If provided, Camel will only consume files if a done 
file exists. This option configures what file name to use. Either you can 
specify a fixed name. Or you can use dynamic placeholders. The done file is 
always expected in the same folder as  [...]
     "fileName": { "kind": "property", "displayName": "File Name", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws.s3.S3Configuration", "configurationField": 
"configuration", "description": "To get the object from the bucket with the 
given file name" },
     "includeBody": { "kind": "property", "displayName": "Include Body", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.aws.s3.S3Configuration", "configurationField": 
"configuration", "description": "If it is true, the exchange body will be set 
to a stream to the contents of the file. If false, the headers [...]
     "prefix": { "kind": "property", "displayName": "Prefix", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws.s3.S3Configuration", "configurationField": 
"configuration", "description": "The prefix which is used in the 
com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we 
are interested in." },
diff --git 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index f76302d..1f1329f 100644
--- 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -43,6 +43,8 @@ public class S3Configuration implements Cloneable {
     private String prefix;
     @UriParam(label = "consumer")
     private String delimiter;
+    @UriParam(label = "consumer")
+    private String doneFileName;
     @UriParam
     private String region;
     @UriParam(label = "consumer", defaultValue = "true")
@@ -192,6 +194,17 @@ public class S3Configuration implements Cloneable {
         this.delimiter = delimiter;
     }
 
+    public String getDoneFileName() {
+        return doneFileName;
+    }
+
+    /**
+     * If provided, Camel will only consume files if a done file exists.
+     */
+    public void setDoneFileName(String doneFileName) {
+        this.doneFileName = doneFileName;
+    }
+
     public String getBucketName() {
         return bucketName;
     }
diff --git 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index fd74577..41a2f15 100644
--- 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -24,6 +24,7 @@ import java.util.Queue;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
@@ -66,9 +67,12 @@ public class S3Consumer extends 
ScheduledBatchPollingConsumer {
 
         String fileName = getConfiguration().getFileName();
         String bucketName = getConfiguration().getBucketName();
+        String doneFileName = getConfiguration().getDoneFileName();
         Queue<Exchange> exchanges;
 
-        if (fileName != null) {
+        if (shouldSkipCauseDoneFileIsConfiguredButMissing(bucketName, 
doneFileName)) {
+            exchanges = new LinkedList<>();
+        } else if (fileName != null) {
             LOG.trace("Getting object in bucket [{}] with file name [{}]...", 
bucketName, fileName);
 
             S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
@@ -108,6 +112,22 @@ public class S3Consumer extends 
ScheduledBatchPollingConsumer {
         return processBatch(CastUtils.cast(exchanges));
     }
 
+    private boolean shouldSkipCauseDoneFileIsConfiguredButMissing(String 
bucketName, String doneFileName) {
+        if (doneFileName == null) {
+            return false;
+        } else {
+            try {
+                getAmazonS3Client().getObjectMetadata(bucketName, 
doneFileName);
+                return false;
+            } catch(AmazonS3Exception e) {
+                if (e.getStatusCode() == 404) {
+                    return true;
+                }
+                throw e;
+            }
+        }
+    }
+
     protected Queue<Exchange> createExchanges(S3Object s3Object) {
         Queue<Exchange> answer = new LinkedList<>();
         Exchange exchange = getEndpoint().createExchange(s3Object);
diff --git 
a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
 
b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
index fe10e28..eff2b24 100644
--- 
a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
+++ 
b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
@@ -53,6 +53,8 @@ public class AWS2S3EndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "deleteafterwrite":
         case "deleteAfterWrite": 
target.getConfiguration().setDeleteAfterWrite(property(camelContext, 
boolean.class, value)); return true;
         case "delimiter": 
target.getConfiguration().setDelimiter(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "donefilename":
+        case "doneFileName": 
target.getConfiguration().setDoneFileName(property(camelContext, 
java.lang.String.class, value)); return true;
         case "destinationbucket":
         case "destinationBucket": 
target.getConfiguration().setDestinationBucket(property(camelContext, 
java.lang.String.class, value)); return true;
         case "destinationbucketprefix":
@@ -177,6 +179,8 @@ public class AWS2S3EndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "deleteafterwrite":
         case "deleteAfterWrite": return boolean.class;
         case "delimiter": return java.lang.String.class;
+        case "donefilename":
+        case "doneFileName": return java.lang.String.class;
         case "destinationbucket":
         case "destinationBucket": return java.lang.String.class;
         case "destinationbucketprefix":
@@ -297,6 +301,8 @@ public class AWS2S3EndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "deleteafterwrite":
         case "deleteAfterWrite": return 
target.getConfiguration().isDeleteAfterWrite();
         case "delimiter": return target.getConfiguration().getDelimiter();
+        case "donefilename":
+        case "doneFileName": return 
target.getConfiguration().getDoneFileName();
         case "destinationbucket":
         case "destinationBucket": return 
target.getConfiguration().getDestinationBucket();
         case "destinationbucketprefix":
diff --git 
a/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
 
b/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
index bb5a4f4..1464f9a 100644
--- 
a/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
+++ 
b/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
@@ -42,6 +42,7 @@
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "deleteAfterRead": { "kind": "property", "displayName": "Delete After 
Read", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": 
"configuration", "description": "Delete objects from S3 after they have been 
retrieved. The delete is only performed if the Ex [...]
     "delimiter": { "kind": "property", "displayName": "Delimiter", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": 
"configuration", "description": "The delimiter which is used in the 
com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we 
are inte [...]
+    "doneFileName": { "kind": "parameter", "displayName": "Done File Name", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"description": "Consumer: If provided, Camel will only consume files if a done 
file exists. This option configures what file name to use. Either you can 
specify a fixed name. Or you can use dynamic placeholders. The done file is 
always expected in the same folder as  [...]
     "destinationBucket": { "kind": "property", "displayName": "Destination 
Bucket", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": 
"configuration", "description": "Define the destination bucket where an object 
must be moved when moveAfterRead is set to true." },
     "destinationBucketPrefix": { "kind": "property", "displayName": 
"Destination Bucket Prefix", "group": "consumer", "label": "consumer", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": 
"configuration", "description": "Define the destination bucket prefix to use 
when an object must be moved and moveAfterRead  [...]
     "destinationBucketSuffix": { "kind": "property", "displayName": 
"Destination Bucket Suffix", "group": "consumer", "label": "consumer", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": 
"configuration", "description": "Define the destination bucket suffix to use 
when an object must be moved and moveAfterRead  [...]
diff --git 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
index 9e70d51..742362a 100644
--- 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
+++ 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
@@ -44,6 +44,8 @@ public class AWS2S3Configuration implements Cloneable {
     private String prefix;
     @UriParam(label = "consumer")
     private String delimiter;
+    @UriParam(label = "consumer")
+    private String doneFileName;
     @UriParam(label = "consumer", defaultValue = "true")
     private boolean includeFolders = true;
     @UriParam
@@ -189,6 +191,17 @@ public class AWS2S3Configuration implements Cloneable {
         this.delimiter = delimiter;
     }
 
+    public String getDoneFileName() {
+        return doneFileName;
+    }
+
+    /**
+     * If provided, Camel will only consume files if a done file exists.
+     */
+    public void setDoneFileName(String doneFileName) {
+        this.doneFileName = doneFileName;
+    }
+
     /**
      * If it is true, the folders/directories will be consumed. If it is 
false, they will be ignored, and Exchanges will
      * not be created for those
diff --git 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index 405f941..f59a81d 100644
--- 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++ 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -47,8 +47,10 @@ import 
software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest.Builder;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 /**
@@ -109,9 +111,12 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
 
         String fileName = getConfiguration().getFileName();
         String bucketName = getConfiguration().getBucketName();
+        String doneFileName = getConfiguration().getDoneFileName();
         Queue<Exchange> exchanges;
 
-        if (fileName != null) {
+        if (shouldSkipCauseDoneFileIsConfiguredButMissing(bucketName, 
doneFileName)) {
+            exchanges = new LinkedList<>();
+        } else if (fileName != null) {
             LOG.trace("Getting object in bucket [{}] with file name [{}]...", 
bucketName, fileName);
 
             ResponseInputStream<GetObjectResponse> s3Object
@@ -153,6 +158,22 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
         return processBatch(CastUtils.cast(exchanges));
     }
 
+    private boolean shouldSkipCauseDoneFileIsConfiguredButMissing(String 
bucketName, String doneFileName) {
+        if (doneFileName == null) {
+            return false;
+        } else {
+            HeadObjectRequest.Builder headObjectsRequest = 
HeadObjectRequest.builder();
+            headObjectsRequest.bucket(bucketName);
+            headObjectsRequest.key(doneFileName);
+            try {
+                getAmazonS3Client().headObject(headObjectsRequest.build());
+                return false;
+            } catch(NoSuchKeyException e) {
+                return true;
+            }
+        }
+    }
+
     protected Queue<Exchange> 
createExchanges(ResponseInputStream<GetObjectResponse> s3Object, String key) {
         Queue<Exchange> answer = new LinkedList<>();
         Exchange exchange = getEndpoint().createExchange(s3Object, key);
@@ -190,7 +211,7 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
                     Exchange exchange = getEndpoint().createExchange(s3Object, 
s3ObjectSummary.key());
                     answer.add(exchange);
                 } else {
-                    // If includeFolders != true and the object is not 
included, it is safe to close the object here. 
+                    // If includeFolders != true and the object is not 
included, it is safe to close the object here.
                     // If includeFolders == true, the exchange will close the 
object.
                     IOHelper.close(s3Object);
                 }

Reply via email to