CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, 
otherwise it only poll data one time and then never anymore.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f16e397
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f16e397
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f16e397

Branch: refs/heads/master
Commit: 9f16e397cb8823f9317bd08a7f0f87ee05e1d5b8
Parents: 3b4b522
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Apr 28 09:02:28 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Apr 28 09:05:44 2016 +0200

----------------------------------------------------------------------
 components/camel-aws/src/main/docs/aws-s3.adoc  |  4 +-
 .../camel/component/aws/s3/S3Configuration.java | 12 ++--
 .../camel/component/aws/s3/S3Consumer.java      | 72 +++++++++-----------
 3 files changed, 42 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/docs/aws-s3.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-s3.adoc 
b/components/camel-aws/src/main/docs/aws-s3.adoc
index 6bf206a..e291580 100644
--- a/components/camel-aws/src/main/docs/aws-s3.adoc
+++ b/components/camel-aws/src/main/docs/aws-s3.adoc
@@ -38,6 +38,7 @@ The AWS S3 Storage Service component has no options.
 
 
 
+
 // endpoint options: START
 The AWS S3 Storage Service component supports 38 endpoint options which are 
listed below:
 
@@ -53,7 +54,7 @@ The AWS S3 Storage Service component supports 38 endpoint 
options which are list
 | proxyPort | common |  | Integer | Camel 2.16: Specify a proxy port to be 
used inside the client definition.
 | secretKey | common |  | String | Amazon AWS Secret Key
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the 
consumer to the Camel routing Error Handler which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages or the likes will now 
be processed as a message and handled by the routing Error Handler. By default 
the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN/ERROR level and ignored.
-| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after 
they have been retrieved. The delete is only performed if the Exchange is 
committed. If a rollback occurs the object is not deleted.
+| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after 
they have been retrieved. The delete is only performed if the Exchange is 
committed. If a rollback occurs the object is not deleted. If this option is 
false then the same objects will be retrieve over and over again on the polls. 
Therefore you need to use the Idempotent Consumer EIP in the route to filter 
out duplicates. You can filter using the link S3ConstantsBUCKET_NAME and link 
S3ConstantsKEY headers or only the link S3ConstantsKEY header.
 | fileName | consumer |  | String | To get the object from the bucket with the 
given file name
 | includeBody | consumer | true | boolean | Camel 2.17: If it is true the 
exchange body will be set to a stream to the contents of the file. If false the 
headers will be set with the S3 object metadata but the body will be null.
 | maxMessagesPerPoll | consumer | 10 | int | Gets the maximum number of 
messages as a limit to poll at each polling. Is default unlimited but use 0 or 
negative number to disable it as unlimited.
@@ -88,6 +89,7 @@ The AWS S3 Storage Service component supports 38 endpoint 
options which are list
 
 
 
+
 |=======================================================================
 
 Required S3 component options

http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 1057763..83c33d2 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -171,10 +171,6 @@ public class S3Configuration implements Cloneable {
         this.region = region;
     }
 
-    public boolean isDeleteAfterRead() {
-        return deleteAfterRead;
-    }
-
     /**
      * *Camel 2.17*: If it is true, the exchange body will be set to a stream 
to the contents of the file.
      * If false, the headers will be set with the S3 object metadata, but the 
body will be null.
@@ -187,9 +183,17 @@ public class S3Configuration implements Cloneable {
         return includeBody;
     }
 
+    public boolean isDeleteAfterRead() {
+        return deleteAfterRead;
+    }
+
     /**
      * Delete objects from S3 after they have been retrieved.  The delete is 
only performed if the Exchange is committed.
      * If a rollback occurs, the object is not deleted.
+     * <p/>
+     * If this option is false, then the same objects will be retrieve over 
and over again on the polls. Therefore you
+     * need to use the Idempotent Consumer EIP in the route to filter out 
duplicates. You can filter using the
+     * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or 
only the {@link S3Constants#KEY} header.
      */
     public void setDeleteAfterRead(boolean deleteAfterRead) {
         this.deleteAfterRead = deleteAfterRead;

http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index eab0508..5fb4936 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory;
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service
  * <a href="http://aws.amazon.com/s3/";>AWS S3</a>
- * 
  */
 public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = 
LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
-    private boolean filesConsumed;
     private transient String s3ConsumerToString;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
@@ -66,48 +64,40 @@ public class S3Consumer extends 
ScheduledBatchPollingConsumer {
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
         
-        if (filesConsumed) {
-            exchanges = new LinkedList<Exchange>();
+        if (fileName != null) {
+            LOG.trace("Getting object in bucket [{}] with file name [{}]...", 
bucketName, fileName);
+
+            S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
+            exchanges = createExchanges(s3Object);
         } else {
-            if (fileName != null) {
-                LOG.trace("Getting object in bucket [{}] with file name 
[{}]...", bucketName, fileName);
-    
-                S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
-                exchanges = createExchanges(s3Object);
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    filesConsumed = true;
-                }
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+            listObjectsRequest.setBucketName(bucketName);
+            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+            if (maxMessagesPerPoll > 0) {
+                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            }
+            // if there was a marker from previous poll then use that to 
continue from where we left last time
+            if (marker != null) {
+                LOG.trace("Resuming from marker: {}", marker);
+                listObjectsRequest.setMarker(marker);
+            }
+
+            ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
+            if (listObjects.isTruncated()) {
+                marker = listObjects.getNextMarker();
+                LOG.trace("Returned list is truncated, so setting next marker: 
{}", marker);
             } else {
-                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-            
-                ListObjectsRequest listObjectsRequest = new 
ListObjectsRequest();
-                listObjectsRequest.setBucketName(bucketName);
-                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-                if (maxMessagesPerPoll > 0) {
-                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-                }
-                if (marker != null && !getConfiguration().isDeleteAfterRead()) 
{
-                    listObjectsRequest.setMarker(marker);
-                }
-            
-                ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
-                // we only setup the marker if the file is not deleted
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    // if the marker is truncated, the nextMarker should not 
be null
-                    if (listObjects.getNextMarker() != null) {
-                        marker = listObjects.getNextMarker();
-                    } else {
-                        // if there is no marker, the files are consumed, we 
should not pull it again
-                        filesConsumed = true;
-                    }
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
-                }
-            
-                exchanges = createExchanges(listObjects.getObjectSummaries());
+                // no more data so clear marker
+                marker = null;
             }
-        }    
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.getObjectSummaries());
+        }
         return processBatch(CastUtils.cast(exchanges));
     }
     

Reply via email to