Repository: camel
Updated Branches:
  refs/heads/master 7eb9e8321 -> cb9667eac


CAMEL-9507 Add includeBody option to camel-aws S3 consumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb9667ea
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb9667ea
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb9667ea

Branch: refs/heads/master
Commit: cb9667eacdbd7336a5311b1649d40adf46c9d4f1
Parents: 7eb9e83
Author: Doug Tung <dwt...@isg.la>
Authored: Wed Feb 3 11:45:54 2016 -0800
Committer: Doug Tung <dwt...@isg.la>
Committed: Fri Feb 5 20:00:10 2016 -0800

----------------------------------------------------------------------
 components/camel-aws/src/main/docs/aws-s3.adoc  |  89 +++++++-------
 .../camel/component/aws/s3/S3Configuration.java |  49 +++++---
 .../camel/component/aws/s3/S3Endpoint.java      |  22 +++-
 .../aws/s3/S3ComponentConfigurationTest.java    |   7 +-
 .../component/aws/s3/S3IncludeBodyTest.java     | 123 +++++++++++++++++++
 5 files changed, 227 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cb9667ea/components/camel-aws/src/main/docs/aws-s3.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-s3.adoc 
b/components/camel-aws/src/main/docs/aws-s3.adoc
index d232535..dd207a6 100644
--- a/components/camel-aws/src/main/docs/aws-s3.adoc
+++ b/components/camel-aws/src/main/docs/aws-s3.adoc
@@ -30,53 +30,54 @@ The bucket will be created if it don't already exists. +
 URI Options
 ^^^^^^^^^^^
 
-[width="100%",cols="10%,10%,10%,70%",options="header",]
-|=======================================================================
-|Name |Default Value |Context |Description
-
-|amazonS3Client |`null` |Shared |Reference to a 
`com.amazonaws.services.sqs.AmazonS3` in the
-link:registry.html[Registry].
-
-|accessKey |`null` |Shared |Amazon AWS Access Key
-
-|secretKey |`null` |Shared |Amazon AWS Secret Key
-
-|amazonS3Endpoint |`null` |Shared |The region with which the AWS-S3 client 
wants to work with.
-
-|region |`null` |Producer |The region who the bucket is located. This option 
is used in the
-`com.amazonaws.services.s3.model.CreateBucketRequest`.
-
-|deleteAfterRead |`true` |Consumer |Delete objects from S3 after it has been 
retrieved.
-
-|deleteAfterWrite |`false` |Producer |*Camel 2.11.0* Delete file object after 
the S3 file has been uploaded
-
-|maxMessagesPerPoll |10 |Consumer |The maximum number of objects which can be 
retrieved in one poll. Used
-in in the `com.amazonaws.services.s3.model.ListObjectsRequest`.
+// endpoint options: START
+The AWS S3 Storage Service component supports 38 endpoint options which are 
listed below:
 
-|policy |`null` |Shared |*Camel 2.8.4*: The policy for this queue to set in the
-`com.amazonaws.services.s3.AmazonS3#setBucketPolicy()` method.
-
-|storageClass |`null` |Producer |*Camel 2.8.4*: The storage class to set in the
-`com.amazonaws.services.s3.model.PutObjectRequest` request.
-
-|prefix |`null` |Consumer |*Camel 2.10.1*: The prefix which is used in the
-`com.amazonaws.services.s3.model.ListObjectsRequest` to only consume
-objects we are interested in.
-
-|multiPartUpload |`false` |Producer |*Camel 2.15.0*: If it is true, camel will 
upload the file with multi
-part format, the part size is decided by the option of `partSize`
-
-|partSize |`25 * 1024 * 1024` |Producer |*Camel 2.15.0*: Setup the partSize 
which is used in multi part upload,
-the default size is 25M.
-
-|serverSideEncryption |null |Producer |*Camel 2.16:* Sets the server-side 
encryption algorithm when encrypting
-the object using AWS-managed keys. For example use AES256.
+[width="100%",cols="2s,1,1m,1m,5",options="header"]
+|=======================================================================
+| Name | Group | Default | Java Type | Description
+| bucketName | common |  | String | *Required* Name of the bucket. The bucket 
will be created if it don't already exists.
+| accessKey | common |  | String | Amazon AWS Access Key
+| amazonS3Client | common |  | AmazonS3 | Reference to a 
com.amazonaws.services.sqs.AmazonS3 in the link:registry.htmlRegistry.
+| amazonS3Endpoint | common |  | String | The region with which the AWS-S3 
client wants to work with.
+| policy | common |  | String | Camel 2.8.4: The policy for this queue to set 
in the com.amazonaws.services.s3.AmazonS3setBucketPolicy() method.
+| proxyHost | common |  | String | Camel 2.16: To define a proxy host when 
instantiating the SQS client
+| proxyPort | common |  | Integer | Camel 2.16: Specify a proxy port to be 
used inside the client definition.
+| secretKey | common |  | String | Amazon AWS Secret Key
+| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the 
consumer to the Camel routing Error Handler which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages or the likes will now 
be processed as a message and handled by the routing Error Handler. By default 
the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN/ERROR level and ignored.
+| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after 
they have been retrieved. The delete is only performed if the Exchange is 
committed. If a rollback occurs the object is not deleted.
+| fileName | consumer |  | String | To get the object from the bucket with the 
given file name
+| includeBody | consumer | true | boolean | Camel 2.17: If it is true the 
exchange body will be set to a stream to the contents of the file. If false the 
headers will be set with the S3 object metadata but the body will be null.
+| maxMessagesPerPoll | consumer | 10 | int | Gets the maximum number of 
messages as a limit to poll at each polling. Is default unlimited but use 0 or 
negative number to disable it as unlimited.
+| prefix | consumer |  | String | Camel 2.10.1: The prefix which is used in 
the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects 
we are interested in.
+| sendEmptyMessageWhenIdle | consumer | false | boolean | If the polling 
consumer did not poll any files you can enable this option to send an empty 
message (no body) instead.
+| exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the 
consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler 
is enabled then this options is not in use. By default the consumer will deal 
with exceptions that will be logged at WARN/ERROR level and ignored.
+| pollStrategy | consumer (advanced) |  | PollingConsumerPollStrategy | A 
pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to 
provide your custom implementation to control error handling usually occurred 
during the poll operation before an Exchange have been created and being routed 
in Camel.
+| deleteAfterWrite | producer | false | boolean | Camel 2.11.0: Delete file 
object after the S3 file has been uploaded
+| multiPartUpload | producer | false | boolean | Camel 2.15.0: If it is true 
camel will upload the file with multi part format the part size is decided by 
the option of partSize
+| partSize | producer | 26214400 | long | Camel 2.15.0: Setup the partSize 
which is used in multi part upload the default size is 25M.
+| region | producer |  | String | The region where the bucket is located. This 
option is used in the com.amazonaws.services.s3.model.CreateBucketRequest.
+| serverSideEncryption | producer |  | String | Camel 2.16: Sets the 
server-side encryption algorithm when encrypting the object using AWS-managed 
keys. For example use AES256.
+| storageClass | producer |  | String | Camel 2.8.4: The storage class to set 
in the com.amazonaws.services.s3.model.PutObjectRequest request.
+| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default 
exchange pattern when creating an exchange
+| synchronous | advanced | false | boolean | Sets whether synchronous 
processing should be strictly used or Camel is allowed to use asynchronous 
processing (if supported).
+| backoffErrorThreshold | scheduler |  | int | The number of subsequent error 
polls (failed due some error) that should happen before the backoffMultipler 
should kick-in.
+| backoffIdleThreshold | scheduler |  | int | The number of subsequent idle 
polls that should happen before the backoffMultipler should kick-in.
+| backoffMultiplier | scheduler |  | int | To let the scheduled polling 
consumer backoff if there has been a number of subsequent idles/errors in a 
row. The multiplier is then the number of polls that will be skipped before the 
next actual attempt is happening again. When this option is in use then 
backoffIdleThreshold and/or backoffErrorThreshold must also be configured.
+| delay | scheduler | 500 | long | Milliseconds before the next poll.
+| greedy | scheduler | false | boolean | If greedy is enabled then the 
ScheduledPollConsumer will run immediately again if the previous run polled 1 
or more messages.
+| initialDelay | scheduler | 1000 | long | Milliseconds before the first poll 
starts.
+| runLoggingLevel | scheduler | TRACE | LoggingLevel | The consumer logs a 
start/complete log line when it polls. This option allows you to configure the 
logging level for that.
+| scheduledExecutorService | scheduler |  | ScheduledExecutorService | Allows 
for configuring a custom/shared thread pool to use for the consumer. By default 
each consumer has its own single threaded thread pool.
+| scheduler | scheduler | none | ScheduledPollConsumerScheduler | To use a 
cron scheduler from either camel-spring or camel-quartz2 component
+| schedulerProperties | scheduler |  | Map | To configure additional 
properties when using a custom scheduler or any of the Quartz2 Spring based 
scheduler.
+| startScheduler | scheduler | true | boolean | Whether the scheduler should 
be auto started.
+| timeUnit | scheduler | MILLISECONDS | TimeUnit | Time unit for initialDelay 
and delay options.
+| useFixedDelay | scheduler | true | boolean | Controls if fixed delay or 
fixed rate is used. See ScheduledExecutorService in JDK for details.
+|=======================================================================
+// endpoint options: END
 
-|proxyHost |null |Producer |*Camel 2.16*: Specify a proxy host to be used 
inside the client
-definition.
 
-|proxyPort |null |Producer |*Camel 2.16*: Specify a proxy port to be used 
inside the client
-definition.
 |=======================================================================
 
 Required S3 component options

http://git-wip-us.apache.org/repos/asf/camel/blob/cb9667ea/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 36bab52..2326478 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -60,13 +60,15 @@ public class S3Configuration implements Cloneable {
     private String proxyHost;
     @UriParam
     private Integer proxyPort;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean includeBody = true;
 
     public long getPartSize() {
         return partSize;
     }
 
     /**
-     * Setup the partSize which is used in multi part upload, the default size 
is 25M.
+     * *Camel 2.15.0*: Setup the partSize which is used in multi part upload, 
the default size is 25M.
      */
     public void setPartSize(long partSize) {
         this.partSize = partSize;
@@ -77,7 +79,7 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * If it is true, Camel will upload the file with multi part format, the 
part size is decided by the option of partSize
+     * *Camel 2.15.0*: If it is true, camel will upload the file with multi 
part format, the part size is decided by the option of `partSize`
      */
     public void setMultiPartUpload(boolean multiPartUpload) {
         this.multiPartUpload = multiPartUpload;
@@ -121,7 +123,7 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * To use the AmazonS3 as the client
+     * Reference to a `com.amazonaws.services.sqs.AmazonS3` in the 
link:registry.html[Registry].
      */
     public void setAmazonS3Client(AmazonS3 amazonS3Client) {
         this.amazonS3Client = amazonS3Client;
@@ -132,7 +134,8 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * The prefix which is used in the 
com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we 
are interested in.
+     * *Camel 2.10.1*: The prefix which is used in the 
com.amazonaws.services.s3.model.ListObjectsRequest
+     * to only consume objects we are interested in.
      */
     public void setPrefix(String prefix) {
         this.prefix = prefix;
@@ -165,7 +168,8 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * The region where the bucket is located.
+     * The region where the bucket is located. This option is used in the
+     * `com.amazonaws.services.s3.model.CreateBucketRequest`.
      */
     public void setRegion(String region) {
         this.region = region;
@@ -176,7 +180,20 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * Delete objects from S3 after it has been retrieved.
+     * *Camel 2.17*: If it is true, the exchange body will be set to a stream 
to the contents of the file.
+     * If false, the headers will be set with the S3 object metadata, but the 
body will be null.
+     */
+    public void setIncludeBody(boolean includeBody) {
+        this.includeBody = includeBody;
+    }
+
+    public boolean isIncludeBody() {
+        return includeBody;
+    }
+
+    /**
+     * Delete objects from S3 after they have been retrieved.  The delete is 
only performed if the Exchange is committed.
+     * If a rollback occurs, the object is not deleted.
      */
     public void setDeleteAfterRead(boolean deleteAfterRead) {
         this.deleteAfterRead = deleteAfterRead;
@@ -187,7 +204,7 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * Delete file object after the S3 file has been uploaded
+     * *Camel 2.11.0*: Delete file object after the S3 file has been uploaded
      */
     public void setDeleteAfterWrite(boolean deleteAfterWrite) {
         this.deleteAfterWrite = deleteAfterWrite;
@@ -198,7 +215,7 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * The policy for this bucket
+     * *Camel 2.8.4*: The policy for this queue to set in the 
`com.amazonaws.services.s3.AmazonS3#setBucketPolicy()` method.
      */
     public void setPolicy(String policy) {
         this.policy = policy;
@@ -209,7 +226,7 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * The storage class
+     * *Camel 2.8.4*: The storage class to set in the 
`com.amazonaws.services.s3.model.PutObjectRequest` request.
      */
     public void setStorageClass(String storageClass) {
         this.storageClass = storageClass;
@@ -220,31 +237,31 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * Sets the server-side encryption algorithm when encrypting the object 
using AWS-managed keys.
+     * *Camel 2.16*: Sets the server-side encryption algorithm when encrypting 
the object using AWS-managed keys.
      * For example use <tt>AES256</tt>.
      */
     public void setServerSideEncryption(String serverSideEncryption) {
         this.serverSideEncryption = serverSideEncryption;
     }
     
-    /**
-     * To define a proxy host when instantiating the SQS client
-     */
     public String getProxyHost() {
         return proxyHost;
     }
 
+    /**
+     * *Camel 2.16*: To define a proxy host when instantiating the SQS client
+     */
     public void setProxyHost(String proxyHost) {
         this.proxyHost = proxyHost;
     }
 
-    /**
-     * To define a proxy port when instantiating the SQS client
-     */
     public Integer getProxyPort() {
         return proxyPort;
     }
 
+    /**
+     * *Camel 2.16*: Specify a proxy port to be used inside the client 
definition.
+     */
     public void setProxyPort(Integer proxyPort) {
         this.proxyPort = proxyPort;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/cb9667ea/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
index 82273be..5d5ed75 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws.s3;
 
+import java.io.IOException;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
@@ -151,7 +153,13 @@ public class S3Endpoint extends ScheduledPollEndpoint {
 
         Exchange exchange = super.createExchange(pattern);
         Message message = exchange.getIn();
-        message.setBody(s3Object.getObjectContent());
+
+        if (configuration.isIncludeBody()) {
+            message.setBody(s3Object.getObjectContent());
+        } else {
+            message.setBody(null);
+        }
+
         message.setHeader(S3Constants.KEY, s3Object.getKey());
         message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
         message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
@@ -166,6 +174,18 @@ public class S3Endpoint extends ScheduledPollEndpoint {
         message.setHeader(S3Constants.S3_HEADERS, 
objectMetadata.getRawMetadata());
         message.setHeader(S3Constants.SERVER_SIDE_ENCRYPTION, 
objectMetadata.getSSEAlgorithm());
 
+        /**
+         * If includeBody != true, it is safe to close the object here.  If 
includeBody == true,
+         * the caller is responsible for closing the stream and object once 
the body has been fully consumed.
+         * As of 2.17, the consumer does not close the stream or object on 
commit.
+         */
+        if (!configuration.isIncludeBody()) {
+            try {
+                s3Object.close();
+            } catch (IOException e) {
+            }
+        }
+
         return exchange;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cb9667ea/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
index f29a193..1490f69 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
@@ -38,6 +38,7 @@ public class S3ComponentConfigurationTest extends 
CamelTestSupport {
         assertNull(endpoint.getConfiguration().getAmazonS3Endpoint());
         assertNull(endpoint.getConfiguration().getPolicy());
         assertNull(endpoint.getConfiguration().getPrefix());
+        assertTrue(endpoint.getConfiguration().isIncludeBody());
     }
     
     @Test
@@ -59,6 +60,7 @@ public class S3ComponentConfigurationTest extends 
CamelTestSupport {
         assertNull(endpoint.getConfiguration().getAmazonS3Endpoint());
         assertNull(endpoint.getConfiguration().getPolicy());
         assertNull(endpoint.getConfiguration().getPrefix());
+        assertTrue(endpoint.getConfiguration().isIncludeBody());
     }
 
     @Test
@@ -68,8 +70,8 @@ public class S3ComponentConfigurationTest extends 
CamelTestSupport {
                 + 
"&accessKey=xxx&secretKey=yyy&region=us-west-1&deleteAfterRead=false&maxMessagesPerPoll=1&policy=%7B%22Version%22%3A%222008-10-17%22,%22Id%22%3A%22Policy4324355464%22,"
                 + 
"%22Statement%22%3A%5B%7B%22Sid%22%3A%22Stmt456464646477%22,%22Action%22%3A%5B%22s3%3AGetObject%22%5D,%22Effect%22%3A%22Allow%22,"
                 + 
"%22Resource%22%3A%5B%22arn%3Aaws%3As3%3A%3A%3Amybucket/some/path/*%22%5D,%22Principal%22%3A%7B%22AWS%22%3A%5B%22*%22%5D%7D%7D%5D%7D&storageClass=REDUCED_REDUNDANCY"
-                + "&prefix=confidential");
-        
+                + "&prefix=confidential&includeBody=false");
+
         assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
         assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
         assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
@@ -82,6 +84,7 @@ public class S3ComponentConfigurationTest extends 
CamelTestSupport {
                 + 
"[\"arn:aws:s3:::mybucket/some/path/*\"],\"Principal\":{\"AWS\":[\"*\"]}}]}", 
endpoint.getConfiguration().getPolicy());
         assertEquals("REDUCED_REDUNDANCY", 
endpoint.getConfiguration().getStorageClass());
         assertEquals("confidential", endpoint.getConfiguration().getPrefix());
+        assertFalse(endpoint.getConfiguration().isIncludeBody());
     }
     
     @Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/camel/blob/cb9667ea/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3IncludeBodyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3IncludeBodyTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3IncludeBodyTest.java
new file mode 100644
index 0000000..6d5ecf5
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3IncludeBodyTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.util.StringInputStream;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Test to verify that the body is not retrieved when the includeBody option 
is false
+ */
+public class S3IncludeBodyTest extends CamelTestSupport {
+    
+    @Test
+    public void testConsumePrefixedMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        
+        assertMockEndpointsSatisfied();
+        assertNull("Expected body to be empty", 
mock.getExchanges().get(0).getIn().getBody(String.class));
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        
+        registry.bind("amazonS3Client", new DummyAmazonS3Client());
+        
+        return registry;
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&region=us-west-1&delay=50"
 
+                        + 
"&maxMessagesPerPoll=5&prefix=confidential&includeBody=false")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    class DummyAmazonS3Client extends AmazonS3Client {
+        
+        private AtomicInteger requestCount = new AtomicInteger(0);
+        
+        public DummyAmazonS3Client() {
+            super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
+        }
+
+        @Override
+        public ObjectListing listObjects(ListObjectsRequest request) throws 
AmazonClientException, AmazonServiceException {
+            int currentRequestCount = requestCount.incrementAndGet();
+            
+            assertEquals("mycamelbucket", request.getBucketName());
+            if (currentRequestCount == 2) {
+                assertEquals("confidential", request.getPrefix());
+            }
+            
+            ObjectListing response = new ObjectListing();
+            response.setBucketName(request.getBucketName());
+            response.setPrefix(request.getPrefix());
+
+            S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
+            s3ObjectSummary.setBucketName(request.getBucketName());
+            s3ObjectSummary.setKey("key");
+            response.getObjectSummaries().add(s3ObjectSummary);
+            
+            return response;
+        }
+        
+        @Override
+        public S3Object getObject(String bucketName, String key) throws 
AmazonClientException, AmazonServiceException {
+            assertEquals("mycamelbucket", bucketName);
+            assertEquals("key", key);
+            
+            S3Object s3Object = new S3Object();
+            s3Object.setBucketName(bucketName);
+            s3Object.setKey(key);
+            try {
+                s3Object.setObjectContent(new StringInputStream("Camel 
rocks!"));
+            } catch (UnsupportedEncodingException e) {
+                // noop
+            }
+            
+            return s3Object;
+        }
+        
+        @Override
+        public void deleteObject(String bucketName, String key) throws 
AmazonClientException, AmazonServiceException {
+            //noop
+        }
+    }
+}
\ No newline at end of file

Reply via email to