CAMEL-8105 support for redrivePolicy inside SQSEndpoint with thanks to Rufus
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ced84063 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ced84063 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ced84063 Branch: refs/heads/master Commit: ced84063a2f56ce555cc2a6a4df53afc4d35c5be Parents: a3bf847 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Dec 3 21:51:18 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Dec 3 21:59:37 2014 +0800 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsConfiguration.java | 12 ++++++++++++ .../org/apache/camel/component/aws/sqs/SqsEndpoint.java | 6 ++++++ .../aws/sqs/SqsComponentConfigurationTest.java | 6 +++++- 3 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index de24cc5..718aee0 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -52,6 +52,9 @@ public class SqsConfiguration { private Integer messageRetentionPeriod; private Integer receiveMessageWaitTimeSeconds; private String policy; + + // dead letter queue properties + private String redrivePolicy; public void setAmazonSQSEndpoint(String amazonSQSEndpoint) { this.amazonSQSEndpoint = amazonSQSEndpoint; @@ -165,6 +168,14 @@ public class SqsConfiguration { this.policy = policy; } + public String getRedrivePolicy() { + return redrivePolicy; + } + + public void setRedrivePolicy(String redrivePolicy) { + this.redrivePolicy = redrivePolicy; + } + public boolean isExtendMessageVisibility() { return this.extendMessageVisibility; } @@ -231,6 +242,7 @@ public class SqsConfiguration { + ", receiveMessageWaitTimeSeconds=" + receiveMessageWaitTimeSeconds + ", delaySeconds=" + delaySeconds + ", policy=" + policy + + ", redrivePolicy=" + redrivePolicy + ", extendMessageVisibility=" + extendMessageVisibility + ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId + ", region=" + region http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index 8ed85c5..29ae3db 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -153,6 +153,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) { request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds())); } + if (getConfiguration().getRedrivePolicy() != null) { + request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy()); + } LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); CreateQueueResult queueResult = client.createQueue(request); @@ -179,6 +182,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) { request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds())); } + if (getConfiguration().getRedrivePolicy() != null) { + request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy()); + } if (!request.getAttributes().isEmpty()) { LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName()); client.setQueueAttributes(request); http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java index e7fb9dc..e7eac1f 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java @@ -43,6 +43,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport { assertNull(endpoint.getConfiguration().getMaximumMessageSize()); assertNull(endpoint.getConfiguration().getMessageRetentionPeriod()); assertNull(endpoint.getConfiguration().getPolicy()); + assertNull(endpoint.getConfiguration().getRedrivePolicy()); assertNull(endpoint.getConfiguration().getRegion()); } @@ -67,6 +68,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport { assertNull(endpoint.getConfiguration().getMaximumMessageSize()); assertNull(endpoint.getConfiguration().getMessageRetentionPeriod()); assertNull(endpoint.getConfiguration().getPolicy()); + assertNull(endpoint.getConfiguration().getRedrivePolicy()); assertNull(endpoint.getConfiguration().getRegion()); } @@ -88,7 +90,8 @@ public class SqsComponentConfigurationTest extends CamelTestSupport { + "%7B%22Version%22%3A%222008-10-17%22%2C%22Id%22%3A%22%2F195004372649%2FMyQueue%2FSQSDefaultPolicy%22%2C%22Statement%22%3A%5B%7B%22Sid%22%3A%22Queue1ReceiveMessage%22%2C%22" + "Effect%22%3A%22Allow%22%2C%22Principal%22%3A%7B%22AWS%22%3A%22*%22%7D%2C%22Action%22%3A%22SQS%3AReceiveMessage%22%2C%22Resource%22%3A%22%2F195004372649%2FMyQueue%22%7D%5D%7D" + "&delaySeconds=123&receiveMessageWaitTimeSeconds=10&waitTimeSeconds=20" - + "&queueOwnerAWSAccountId=111222333®ion=us-east-1"); + + "&queueOwnerAWSAccountId=111222333®ion=us-east-1" + + "&redrivePolicy={\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}"); assertEquals("MyQueue", endpoint.getConfiguration().getQueueName()); assertEquals("xxx", endpoint.getConfiguration().getAccessKey()); @@ -104,6 +107,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport { assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"/195004372649/MyQueue/SQSDefaultPolicy\",\"Statement\":[{\"Sid\":\"Queue1ReceiveMessage\",\"Effect\":\"Allow\",\"Principal\":" + "{\"AWS\":\"*\"},\"Action\":\"SQS:ReceiveMessage\",\"Resource\":\"/195004372649/MyQueue\"}]}", endpoint.getConfiguration().getPolicy()); + assertEquals("{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}", endpoint.getConfiguration().getRedrivePolicy()); assertEquals(new Integer(123), endpoint.getConfiguration().getDelaySeconds()); assertEquals(Integer.valueOf(10), endpoint.getConfiguration().getReceiveMessageWaitTimeSeconds()); assertEquals(Integer.valueOf(20), endpoint.getConfiguration().getWaitTimeSeconds());