CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9216caf3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9216caf3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9216caf3 Branch: refs/heads/camel-2.16.x Commit: 9216caf3d1983d1b661d27deaf633fcfc6a59c09 Parents: 4055428 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Apr 28 09:02:28 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 28 09:05:07 2016 +0200 ---------------------------------------------------------------------- .../camel/component/aws/s3/S3Configuration.java | 7 +- .../camel/component/aws/s3/S3Consumer.java | 72 +++++++++----------- 2 files changed, 37 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9216caf3/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java index 36bab52..97d4e40 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java @@ -176,7 +176,12 @@ public class S3Configuration implements Cloneable { } /** - * Delete objects from S3 after it has been retrieved. + * Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. + * If a rollback occurs, the object is not deleted. + * <p/> + * If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you + * need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the + * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or only the {@link S3Constants#KEY} header. */ public void setDeleteAfterRead(boolean deleteAfterRead) { this.deleteAfterRead = deleteAfterRead; http://git-wip-us.apache.org/repos/asf/camel/blob/9216caf3/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java index eab0508..5fb4936 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java @@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory; /** * A Consumer of messages from the Amazon Web Service Simple Storage Service * <a href="http://aws.amazon.com/s3/">AWS S3</a> - * */ public class S3Consumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class); private String marker; - private boolean filesConsumed; private transient String s3ConsumerToString; public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException { @@ -66,48 +64,40 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { String bucketName = getConfiguration().getBucketName(); Queue<Exchange> exchanges; - if (filesConsumed) { - exchanges = new LinkedList<Exchange>(); + if (fileName != null) { + LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); + + S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); + exchanges = createExchanges(s3Object); } else { - if (fileName != null) { - LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); - - S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); - exchanges = createExchanges(s3Object); - if (!getConfiguration().isDeleteAfterRead()) { - filesConsumed = true; - } + LOG.trace("Queueing objects in bucket [{}]...", bucketName); + + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(bucketName); + listObjectsRequest.setPrefix(getConfiguration().getPrefix()); + if (maxMessagesPerPoll > 0) { + listObjectsRequest.setMaxKeys(maxMessagesPerPoll); + } + // if there was a marker from previous poll then use that to continue from where we left last time + if (marker != null) { + LOG.trace("Resuming from marker: {}", marker); + listObjectsRequest.setMarker(marker); + } + + ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); + if (listObjects.isTruncated()) { + marker = listObjects.getNextMarker(); + LOG.trace("Returned list is truncated, so setting next marker: {}", marker); } else { - LOG.trace("Queueing objects in bucket [{}]...", bucketName); - - ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); - listObjectsRequest.setBucketName(bucketName); - listObjectsRequest.setPrefix(getConfiguration().getPrefix()); - if (maxMessagesPerPoll > 0) { - listObjectsRequest.setMaxKeys(maxMessagesPerPoll); - } - if (marker != null && !getConfiguration().isDeleteAfterRead()) { - listObjectsRequest.setMarker(marker); - } - - ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); - // we only setup the marker if the file is not deleted - if (!getConfiguration().isDeleteAfterRead()) { - // if the marker is truncated, the nextMarker should not be null - if (listObjects.getNextMarker() != null) { - marker = listObjects.getNextMarker(); - } else { - // if there is no marker, the files are consumed, we should not pull it again - filesConsumed = true; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); - } - - exchanges = createExchanges(listObjects.getObjectSummaries()); + // no more data so clear marker + marker = null; } - } + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + } + + exchanges = createExchanges(listObjects.getObjectSummaries()); + } return processBatch(CastUtils.cast(exchanges)); }