Author: cmueller Date: Mon Jul 23 21:36:13 2012 New Revision: 1364807 URL: http://svn.apache.org/viewvc?rev=1364807&view=rev Log: CAMEL-5343: Improve Camel AWS-S3 to be able to only pick up specific files
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ConsumerPrefixTest.java Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java?rev=1364807&r1=1364806&r2=1364807&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java Mon Jul 23 21:36:13 2012 @@ -29,6 +29,7 @@ public class S3Configuration implements private AmazonS3Client amazonS3Client; private String bucketName; + private String prefix; private String region; private boolean deleteAfterRead = true; private String amazonS3Endpoint; @@ -67,6 +68,14 @@ public class S3Configuration implements this.amazonS3Client = amazonS3Client; } + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + public String getBucketName() { return bucketName; } Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1364807&r1=1364806&r2=1364807&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Mon Jul 23 21:36:13 2012 @@ -62,6 +62,7 @@ public class S3Consumer extends Schedule ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); listObjectsRequest.setBucketName(bucketName); + listObjectsRequest.setPrefix(getConfiguration().getPrefix()); listObjectsRequest.setMaxKeys(maxMessagesPerPoll); ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java?rev=1364807&r1=1364806&r2=1364807&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java (original) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java Mon Jul 23 21:36:13 2012 @@ -37,6 +37,7 @@ public class S3ComponentConfigurationTes assertEquals(10, endpoint.getMaxMessagesPerPoll()); assertNull(endpoint.getConfiguration().getAmazonS3Endpoint()); assertNull(endpoint.getConfiguration().getPolicy()); + assertNull(endpoint.getConfiguration().getPrefix()); } @Test @@ -57,6 +58,7 @@ public class S3ComponentConfigurationTes assertEquals(10, endpoint.getMaxMessagesPerPoll()); assertNull(endpoint.getConfiguration().getAmazonS3Endpoint()); assertNull(endpoint.getConfiguration().getPolicy()); + assertNull(endpoint.getConfiguration().getPrefix()); } @Test @@ -65,7 +67,8 @@ public class S3ComponentConfigurationTes S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?amazonS3Endpoint=sns.eu-west-1.amazonaws.com" + "&accessKey=xxx&secretKey=yyy®ion=us-west-1&deleteAfterRead=false&maxMessagesPerPoll=1&policy=%7B%22Version%22%3A%222008-10-17%22,%22Id%22%3A%22Policy4324355464%22," + "%22Statement%22%3A%5B%7B%22Sid%22%3A%22Stmt456464646477%22,%22Action%22%3A%5B%22s3%3AGetObject%22%5D,%22Effect%22%3A%22Allow%22," - + "%22Resource%22%3A%5B%22arn%3Aaws%3As3%3A%3A%3Amybucket/some/path/*%22%5D,%22Principal%22%3A%7B%22AWS%22%3A%5B%22*%22%5D%7D%7D%5D%7D&storageClass=REDUCED_REDUNDANCY"); + + "%22Resource%22%3A%5B%22arn%3Aaws%3As3%3A%3A%3Amybucket/some/path/*%22%5D,%22Principal%22%3A%7B%22AWS%22%3A%5B%22*%22%5D%7D%7D%5D%7D&storageClass=REDUCED_REDUNDANCY" + + "&prefix=confidential"); assertEquals("MyBucket", endpoint.getConfiguration().getBucketName()); assertEquals("xxx", endpoint.getConfiguration().getAccessKey()); @@ -78,6 +81,7 @@ public class S3ComponentConfigurationTes assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"Policy4324355464\",\"Statement\":[{\"Sid\":\"Stmt456464646477\",\"Action\":[\"s3:GetObject\"],\"Effect\":\"Allow\",\"Resource\":" + "[\"arn:aws:s3:::mybucket/some/path/*\"],\"Principal\":{\"AWS\":[\"*\"]}}]}", endpoint.getConfiguration().getPolicy()); assertEquals("REDUCED_REDUNDANCY", endpoint.getConfiguration().getStorageClass()); + assertEquals("confidential", endpoint.getConfiguration().getPrefix()); } @Test(expected = IllegalArgumentException.class) Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ConsumerPrefixTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ConsumerPrefixTest.java?rev=1364807&view=auto ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ConsumerPrefixTest.java (added) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ConsumerPrefixTest.java Mon Jul 23 21:36:13 2012 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.s3; + +import java.io.UnsupportedEncodingException; +import java.util.concurrent.atomic.AtomicInteger; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.util.StringInputStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Test to verify that the polling consumer delivers an empty Exchange when the + * sendEmptyMessageWhenIdle property is set and a polling event yields no results. + */ +public class S3ConsumerPrefixTest extends CamelTestSupport { + + @Test + public void testConsumePrefixedMessages() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + assertMockEndpointsSatisfied(); + assertEquals("Camel rocks!", mock.getExchanges().get(0).getIn().getBody(String.class)); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + registry.bind("amazonS3Client", new DummyAmazonS3Client()); + + return registry; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client®ion=us-west-1&delay=50" + + "&maxMessagesPerPoll=5&prefix=confidential") + .to("mock:result"); + } + }; + } + + class DummyAmazonS3Client extends AmazonS3Client { + + private AtomicInteger requestCount = new AtomicInteger(0); + + public DummyAmazonS3Client() { + super(new BasicAWSCredentials("myAccessKey", "mySecretKey")); + } + + @Override + public ObjectListing listObjects(ListObjectsRequest request) throws AmazonClientException, AmazonServiceException { + int currentRequestCount = requestCount.incrementAndGet(); + + assertEquals("mycamelbucket", request.getBucketName()); + if (currentRequestCount == 2) { + assertEquals("confidential", request.getPrefix()); + } + + ObjectListing response = new ObjectListing(); + response.setBucketName(request.getBucketName()); + response.setPrefix(request.getPrefix()); + + S3ObjectSummary s3ObjectSummary = new S3ObjectSummary(); + s3ObjectSummary.setBucketName(request.getBucketName()); + s3ObjectSummary.setKey("key"); + response.getObjectSummaries().add(s3ObjectSummary); + + return response; + } + + @Override + public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException { + assertEquals("mycamelbucket", bucketName); + assertEquals("key", key); + + S3Object s3Object = new S3Object(); + s3Object.setBucketName(bucketName); + s3Object.setKey(key); + try { + s3Object.setObjectContent(new StringInputStream("Camel rocks!")); + } catch (UnsupportedEncodingException e) { + // noop + } + + return s3Object; + } + + @Override + public void deleteObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException { + //noop + } + } +} \ No newline at end of file