CAMEL-8105 support for redrivePolicy inside SQSEndpoint with thanks to Rufus


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ced84063
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ced84063
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ced84063

Branch: refs/heads/master
Commit: ced84063a2f56ce555cc2a6a4df53afc4d35c5be
Parents: a3bf847
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Wed Dec 3 21:51:18 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Wed Dec 3 21:59:37 2014 +0800

----------------------------------------------------------------------
 .../camel/component/aws/sqs/SqsConfiguration.java       | 12 ++++++++++++
 .../org/apache/camel/component/aws/sqs/SqsEndpoint.java |  6 ++++++
 .../aws/sqs/SqsComponentConfigurationTest.java          |  6 +++++-
 3 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index de24cc5..718aee0 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -52,6 +52,9 @@ public class SqsConfiguration {
     private Integer messageRetentionPeriod;
     private Integer receiveMessageWaitTimeSeconds;
     private String policy;
+    
+    // dead letter queue properties
+    private String redrivePolicy;
 
     public void setAmazonSQSEndpoint(String amazonSQSEndpoint) {
         this.amazonSQSEndpoint = amazonSQSEndpoint;
@@ -165,6 +168,14 @@ public class SqsConfiguration {
         this.policy = policy;
     }
 
+    public String getRedrivePolicy() {
+        return redrivePolicy;
+    }
+
+    public void setRedrivePolicy(String redrivePolicy) {
+        this.redrivePolicy = redrivePolicy;
+    }
+
     public boolean isExtendMessageVisibility() {
         return this.extendMessageVisibility;
     }
@@ -231,6 +242,7 @@ public class SqsConfiguration {
             + ", receiveMessageWaitTimeSeconds=" + 
receiveMessageWaitTimeSeconds
             + ", delaySeconds=" + delaySeconds
             + ", policy=" + policy
+            + ", redrivePolicy=" + redrivePolicy
             + ", extendMessageVisibility=" + extendMessageVisibility
             + ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId
             + ", region=" + region

http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 8ed85c5..29ae3db 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -153,6 +153,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
         if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
             
request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(),
 String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
         }
+        if (getConfiguration().getRedrivePolicy() != null) {
+            
request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), 
getConfiguration().getRedrivePolicy());
+        }
         LOG.trace("Creating queue [{}] with request [{}]...", 
configuration.getQueueName(), request);
         
         CreateQueueResult queueResult = client.createQueue(request);
@@ -179,6 +182,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
         if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
             
request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(),
 String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
         }
+        if (getConfiguration().getRedrivePolicy() != null) {
+            
request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), 
getConfiguration().getRedrivePolicy());
+        }
         if (!request.getAttributes().isEmpty()) {
             LOG.trace("Updating queue '{}' with the provided queue 
attributes...", configuration.getQueueName());
             client.setQueueAttributes(request);

http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
index e7fb9dc..e7eac1f 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
@@ -43,6 +43,7 @@ public class SqsComponentConfigurationTest extends 
CamelTestSupport {
         assertNull(endpoint.getConfiguration().getMaximumMessageSize());
         assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
         assertNull(endpoint.getConfiguration().getPolicy());
+        assertNull(endpoint.getConfiguration().getRedrivePolicy());
         assertNull(endpoint.getConfiguration().getRegion());
     }
     
@@ -67,6 +68,7 @@ public class SqsComponentConfigurationTest extends 
CamelTestSupport {
         assertNull(endpoint.getConfiguration().getMaximumMessageSize());
         assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
         assertNull(endpoint.getConfiguration().getPolicy());
+        assertNull(endpoint.getConfiguration().getRedrivePolicy());
         assertNull(endpoint.getConfiguration().getRegion());
     }
     
@@ -88,7 +90,8 @@ public class SqsComponentConfigurationTest extends 
CamelTestSupport {
                 + 
"%7B%22Version%22%3A%222008-10-17%22%2C%22Id%22%3A%22%2F195004372649%2FMyQueue%2FSQSDefaultPolicy%22%2C%22Statement%22%3A%5B%7B%22Sid%22%3A%22Queue1ReceiveMessage%22%2C%22"
                 + 
"Effect%22%3A%22Allow%22%2C%22Principal%22%3A%7B%22AWS%22%3A%22*%22%7D%2C%22Action%22%3A%22SQS%3AReceiveMessage%22%2C%22Resource%22%3A%22%2F195004372649%2FMyQueue%22%7D%5D%7D"
                 + 
"&delaySeconds=123&receiveMessageWaitTimeSeconds=10&waitTimeSeconds=20"
-                + "&queueOwnerAWSAccountId=111222333&region=us-east-1");
+                + "&queueOwnerAWSAccountId=111222333&region=us-east-1"
+                + "&redrivePolicy={\"maxReceiveCount\":\"5\", 
\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}");
         
         assertEquals("MyQueue", endpoint.getConfiguration().getQueueName());
         assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
@@ -104,6 +107,7 @@ public class SqsComponentConfigurationTest extends 
CamelTestSupport {
         
assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"/195004372649/MyQueue/SQSDefaultPolicy\",\"Statement\":[{\"Sid\":\"Queue1ReceiveMessage\",\"Effect\":\"Allow\",\"Principal\":"
                 + 
"{\"AWS\":\"*\"},\"Action\":\"SQS:ReceiveMessage\",\"Resource\":\"/195004372649/MyQueue\"}]}",
                 endpoint.getConfiguration().getPolicy());
+        assertEquals("{\"maxReceiveCount\":\"5\", 
\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}",
 endpoint.getConfiguration().getRedrivePolicy());
         assertEquals(new Integer(123), 
endpoint.getConfiguration().getDelaySeconds());
         assertEquals(Integer.valueOf(10), 
endpoint.getConfiguration().getReceiveMessageWaitTimeSeconds());
         assertEquals(Integer.valueOf(20), 
endpoint.getConfiguration().getWaitTimeSeconds());

Reply via email to