Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 405542810 -> 9216caf3d
  refs/heads/camel-2.17.x 45317409d -> 7a72ca610


CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, 
otherwise it only poll data one time and then never anymore.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7a72ca61
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7a72ca61
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7a72ca61

Branch: refs/heads/camel-2.17.x
Commit: 7a72ca610b2e57c12aab808d558c54a18acb3026
Parents: 4531740
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Apr 28 09:02:28 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Apr 28 09:03:54 2016 +0200

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Configuration.java | 12 ++--
 .../camel/component/aws/s3/S3Consumer.java      | 72 +++++++++-----------
 2 files changed, 39 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7a72ca61/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 2326478..1dcc3c0 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -175,10 +175,6 @@ public class S3Configuration implements Cloneable {
         this.region = region;
     }
 
-    public boolean isDeleteAfterRead() {
-        return deleteAfterRead;
-    }
-
     /**
      * *Camel 2.17*: If it is true, the exchange body will be set to a stream 
to the contents of the file.
      * If false, the headers will be set with the S3 object metadata, but the 
body will be null.
@@ -191,9 +187,17 @@ public class S3Configuration implements Cloneable {
         return includeBody;
     }
 
+    public boolean isDeleteAfterRead() {
+        return deleteAfterRead;
+    }
+
     /**
      * Delete objects from S3 after they have been retrieved.  The delete is 
only performed if the Exchange is committed.
      * If a rollback occurs, the object is not deleted.
+     * <p/>
+     * If this option is false, then the same objects will be retrieve over 
and over again on the polls. Therefore you
+     * need to use the Idempotent Consumer EIP in the route to filter out 
duplicates. You can filter using the
+     * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or 
only the {@link S3Constants#KEY} header.
      */
     public void setDeleteAfterRead(boolean deleteAfterRead) {
         this.deleteAfterRead = deleteAfterRead;

http://git-wip-us.apache.org/repos/asf/camel/blob/7a72ca61/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index eab0508..5fb4936 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory;
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service
  * <a href="http://aws.amazon.com/s3/";>AWS S3</a>
- * 
  */
 public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = 
LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
-    private boolean filesConsumed;
     private transient String s3ConsumerToString;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
@@ -66,48 +64,40 @@ public class S3Consumer extends 
ScheduledBatchPollingConsumer {
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
         
-        if (filesConsumed) {
-            exchanges = new LinkedList<Exchange>();
+        if (fileName != null) {
+            LOG.trace("Getting object in bucket [{}] with file name [{}]...", 
bucketName, fileName);
+
+            S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
+            exchanges = createExchanges(s3Object);
         } else {
-            if (fileName != null) {
-                LOG.trace("Getting object in bucket [{}] with file name 
[{}]...", bucketName, fileName);
-    
-                S3Object s3Object = getAmazonS3Client().getObject(new 
GetObjectRequest(bucketName, fileName));
-                exchanges = createExchanges(s3Object);
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    filesConsumed = true;
-                }
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+            listObjectsRequest.setBucketName(bucketName);
+            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+            if (maxMessagesPerPoll > 0) {
+                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            }
+            // if there was a marker from previous poll then use that to 
continue from where we left last time
+            if (marker != null) {
+                LOG.trace("Resuming from marker: {}", marker);
+                listObjectsRequest.setMarker(marker);
+            }
+
+            ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
+            if (listObjects.isTruncated()) {
+                marker = listObjects.getNextMarker();
+                LOG.trace("Returned list is truncated, so setting next marker: 
{}", marker);
             } else {
-                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-            
-                ListObjectsRequest listObjectsRequest = new 
ListObjectsRequest();
-                listObjectsRequest.setBucketName(bucketName);
-                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-                if (maxMessagesPerPoll > 0) {
-                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-                }
-                if (marker != null && !getConfiguration().isDeleteAfterRead()) 
{
-                    listObjectsRequest.setMarker(marker);
-                }
-            
-                ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
-                // we only setup the marker if the file is not deleted
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    // if the marker is truncated, the nextMarker should not 
be null
-                    if (listObjects.getNextMarker() != null) {
-                        marker = listObjects.getNextMarker();
-                    } else {
-                        // if there is no marker, the files are consumed, we 
should not pull it again
-                        filesConsumed = true;
-                    }
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
-                }
-            
-                exchanges = createExchanges(listObjects.getObjectSummaries());
+                // no more data so clear marker
+                marker = null;
             }
-        }    
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", 
listObjects.getObjectSummaries().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.getObjectSummaries());
+        }
         return processBatch(CastUtils.cast(exchanges));
     }
     

Reply via email to