CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f16e397 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f16e397 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f16e397 Branch: refs/heads/master Commit: 9f16e397cb8823f9317bd08a7f0f87ee05e1d5b8 Parents: 3b4b522 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Apr 28 09:02:28 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 28 09:05:44 2016 +0200 ---------------------------------------------------------------------- components/camel-aws/src/main/docs/aws-s3.adoc | 4 +- .../camel/component/aws/s3/S3Configuration.java | 12 ++-- .../camel/component/aws/s3/S3Consumer.java | 72 +++++++++----------- 3 files changed, 42 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/docs/aws-s3.adoc ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/docs/aws-s3.adoc b/components/camel-aws/src/main/docs/aws-s3.adoc index 6bf206a..e291580 100644 --- a/components/camel-aws/src/main/docs/aws-s3.adoc +++ b/components/camel-aws/src/main/docs/aws-s3.adoc @@ -38,6 +38,7 @@ The AWS S3 Storage Service component has no options. + // endpoint options: START The AWS S3 Storage Service component supports 38 endpoint options which are listed below: @@ -53,7 +54,7 @@ The AWS S3 Storage Service component supports 38 endpoint options which are list | proxyPort | common | | Integer | Camel 2.16: Specify a proxy port to be used inside the client definition. | secretKey | common | | String | Amazon AWS Secret Key | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. -| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs the object is not deleted. +| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs the object is not deleted. If this option is false then the same objects will be retrieve over and over again on the polls. Therefore you need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the link S3ConstantsBUCKET_NAME and link S3ConstantsKEY headers or only the link S3ConstantsKEY header. | fileName | consumer | | String | To get the object from the bucket with the given file name | includeBody | consumer | true | boolean | Camel 2.17: If it is true the exchange body will be set to a stream to the contents of the file. If false the headers will be set with the S3 object metadata but the body will be null. | maxMessagesPerPoll | consumer | 10 | int | Gets the maximum number of messages as a limit to poll at each polling. Is default unlimited but use 0 or negative number to disable it as unlimited. @@ -88,6 +89,7 @@ The AWS S3 Storage Service component supports 38 endpoint options which are list + |======================================================================= Required S3 component options http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java index 1057763..83c33d2 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java @@ -171,10 +171,6 @@ public class S3Configuration implements Cloneable { this.region = region; } - public boolean isDeleteAfterRead() { - return deleteAfterRead; - } - /** * *Camel 2.17*: If it is true, the exchange body will be set to a stream to the contents of the file. * If false, the headers will be set with the S3 object metadata, but the body will be null. @@ -187,9 +183,17 @@ public class S3Configuration implements Cloneable { return includeBody; } + public boolean isDeleteAfterRead() { + return deleteAfterRead; + } + /** * Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. * If a rollback occurs, the object is not deleted. + * <p/> + * If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you + * need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the + * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or only the {@link S3Constants#KEY} header. */ public void setDeleteAfterRead(boolean deleteAfterRead) { this.deleteAfterRead = deleteAfterRead; http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java index eab0508..5fb4936 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java @@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory; /** * A Consumer of messages from the Amazon Web Service Simple Storage Service * <a href="http://aws.amazon.com/s3/">AWS S3</a> - * */ public class S3Consumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class); private String marker; - private boolean filesConsumed; private transient String s3ConsumerToString; public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException { @@ -66,48 +64,40 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { String bucketName = getConfiguration().getBucketName(); Queue<Exchange> exchanges; - if (filesConsumed) { - exchanges = new LinkedList<Exchange>(); + if (fileName != null) { + LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); + + S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); + exchanges = createExchanges(s3Object); } else { - if (fileName != null) { - LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); - - S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); - exchanges = createExchanges(s3Object); - if (!getConfiguration().isDeleteAfterRead()) { - filesConsumed = true; - } + LOG.trace("Queueing objects in bucket [{}]...", bucketName); + + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(bucketName); + listObjectsRequest.setPrefix(getConfiguration().getPrefix()); + if (maxMessagesPerPoll > 0) { + listObjectsRequest.setMaxKeys(maxMessagesPerPoll); + } + // if there was a marker from previous poll then use that to continue from where we left last time + if (marker != null) { + LOG.trace("Resuming from marker: {}", marker); + listObjectsRequest.setMarker(marker); + } + + ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); + if (listObjects.isTruncated()) { + marker = listObjects.getNextMarker(); + LOG.trace("Returned list is truncated, so setting next marker: {}", marker); } else { - LOG.trace("Queueing objects in bucket [{}]...", bucketName); - - ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); - listObjectsRequest.setBucketName(bucketName); - listObjectsRequest.setPrefix(getConfiguration().getPrefix()); - if (maxMessagesPerPoll > 0) { - listObjectsRequest.setMaxKeys(maxMessagesPerPoll); - } - if (marker != null && !getConfiguration().isDeleteAfterRead()) { - listObjectsRequest.setMarker(marker); - } - - ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); - // we only setup the marker if the file is not deleted - if (!getConfiguration().isDeleteAfterRead()) { - // if the marker is truncated, the nextMarker should not be null - if (listObjects.getNextMarker() != null) { - marker = listObjects.getNextMarker(); - } else { - // if there is no marker, the files are consumed, we should not pull it again - filesConsumed = true; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); - } - - exchanges = createExchanges(listObjects.getObjectSummaries()); + // no more data so clear marker + marker = null; } - } + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + } + + exchanges = createExchanges(listObjects.getObjectSummaries()); + } return processBatch(CastUtils.cast(exchanges)); }