Updated Branches: refs/heads/master f54b77d6c -> cdad658ca
CAMEL-5790 aws-s3 supports retrieving a single object request with thanks to Jason Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cdad658c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cdad658c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cdad658c Branch: refs/heads/master Commit: cdad658ca2a60086245eaed8234c987bbfb35e3d Parents: f54b77d Author: Willem Jiang <ningji...@apache.org> Authored: Wed Oct 30 17:19:36 2013 +0800 Committer: Willem Jiang <ningji...@apache.org> Committed: Wed Oct 30 17:19:36 2013 +0800 ---------------------------------------------------------------------- .../camel/component/aws/s3/S3Configuration.java | 10 ++++++ .../camel/component/aws/s3/S3Consumer.java | 38 ++++++++++++++------ .../camel/component/aws/s3/S3Endpoint.java | 9 ++++- .../component/aws/s3/AmazonS3ClientMock.java | 5 +-- 4 files changed, 49 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cdad658c/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java index 0cb1579..8cd52e8 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java @@ -29,6 +29,7 @@ public class S3Configuration implements Cloneable { private AmazonS3 amazonS3Client; private String bucketName; + private String fileName; private String prefix; private String region; private boolean deleteAfterRead = true; @@ -85,6 +86,15 @@ public class S3Configuration implements Cloneable { this.bucketName = bucketName; } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + public String getRegion() { return region; } http://git-wip-us.apache.org/repos/asf/camel/blob/cdad658c/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java index 0d628cc..3289d36 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java @@ -22,6 +22,7 @@ import java.util.Queue; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3Object; @@ -58,24 +59,41 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { shutdownRunningTask = null; pendingExchanges = 0; + String fileName = getConfiguration().getFileName(); String bucketName = getConfiguration().getBucketName(); - LOG.trace("Queueing objects in bucket [{}]...", bucketName); + Queue<Exchange> exchanges = null; + + if (fileName != null) { + LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName); + + S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName)); + exchanges = createExchanges(s3Object); + } else { + LOG.trace("Queueing objects in bucket [{}]...", bucketName); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); - listObjectsRequest.setBucketName(bucketName); - listObjectsRequest.setPrefix(getConfiguration().getPrefix()); - listObjectsRequest.setMaxKeys(maxMessagesPerPoll); + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(bucketName); + listObjectsRequest.setPrefix(getConfiguration().getPrefix()); + listObjectsRequest.setMaxKeys(maxMessagesPerPoll); - ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); + ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); - if (LOG.isTraceEnabled()) { - LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); - } + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName); + } - Queue<Exchange> exchanges = createExchanges(listObjects.getObjectSummaries()); + exchanges = createExchanges(listObjects.getObjectSummaries()); + } return processBatch(CastUtils.cast(exchanges)); } + protected Queue<Exchange> createExchanges(S3Object s3Object) { + Queue<Exchange> answer = new LinkedList<Exchange>(); + Exchange exchange = getEndpoint().createExchange(s3Object); + answer.add(exchange); + return answer; + } + protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) { if (LOG.isTraceEnabled()) { LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size()); http://git-wip-us.apache.org/repos/asf/camel/blob/cdad658c/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java index 64f9315..15d37d1 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java @@ -79,7 +79,14 @@ public class S3Endpoint extends ScheduledPollEndpoint { @Override public void doStart() throws Exception { super.doStart(); - + + String fileName = getConfiguration().getFileName(); + + if (fileName != null) { + LOG.trace("File name [{}] requested, so skipping bucket check...", fileName); + return; + } + String bucketName = getConfiguration().getBucketName(); LOG.trace("Quering whether bucket [{}] already exists...", bucketName); http://git-wip-us.apache.org/repos/asf/camel/blob/cdad658c/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java index ba9aabe..33113df 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java @@ -247,7 +247,8 @@ public class AmazonS3ClientMock extends AmazonS3Client { public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) throws AmazonClientException, AmazonServiceException { throw new UnsupportedOperationException(); } - + + @Override public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException { for (S3Object s3Object : objects) { @@ -271,7 +272,7 @@ public class AmazonS3ClientMock extends AmazonS3Client { @Override public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException, AmazonServiceException { - throw new UnsupportedOperationException(); + return getObject(getObjectRequest.getBucketName(), getObjectRequest.getKey()); } @Override