Author: cmueller Date: Sun May 15 15:47:25 2011 New Revision: 1103434 URL: http://svn.apache.org/viewvc?rev=1103434&view=rev Log: CAMEL-3869: Patch for Camel AWS SQS endpoint to set maximum message size and message retention period - work in progress
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java Sun May 15 15:47:25 2011 @@ -16,10 +16,10 @@ */ package org.apache.camel.component.aws.sqs; -import java.util.Collection; - import com.amazonaws.services.sqs.AmazonSQSClient; +import java.util.Collection; + /** * The AWS SQS component configuration properties * @@ -39,6 +39,11 @@ public class SqsConfiguration { private Collection<String> attributeNames; private Integer defaultVisibilityTimeout; + // queue properties + private Integer maximumMessageSize; + private Integer messageRetentionPeriod; + private String policy; + public void setAmazonSQSEndpoint(String amazonSQSEndpoint) { this.amazonSQSEndpoint = amazonSQSEndpoint; } @@ -111,6 +116,30 @@ public class SqsConfiguration { this.defaultVisibilityTimeout = defaultVisibilityTimeout; } + public Integer getMaximumMessageSize() { + return maximumMessageSize; + } + + public void setMaximumMessageSize(Integer maximumMessageSize) { + this.maximumMessageSize = maximumMessageSize; + } + + public Integer getMessageRetentionPeriod() { + return messageRetentionPeriod; + } + + public void setMessageRetentionPeriod(Integer messageRetentionPeriod) { + this.messageRetentionPeriod = messageRetentionPeriod; + } + + public String getPolicy() { + return policy; + } + + public void setPolicy(String policy) { + this.policy = policy; + } + @Override public String toString() { return "SqsConfiguration[queueName=" + queueName @@ -121,6 +150,9 @@ public class SqsConfiguration { + ", visibilityTimeout=" + visibilityTimeout + ", attributeNames=" + attributeNames + ", defaultVisibilityTimeout=" + defaultVisibilityTimeout + + ", maximumMessageSize=" + maximumMessageSize + + ", messageRetentionPeriod=" + messageRetentionPeriod + + ", policy=" + policy + "]"; } } \ No newline at end of file Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Sun May 15 15:47:25 2011 @@ -16,13 +16,12 @@ */ package org.apache.camel.component.aws.sqs; -import java.util.HashMap; - import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -35,6 +34,8 @@ import org.apache.camel.impl.ScheduledPo import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; + /** * Defines the <a href="http://camel.apache.org/aws.html">AWS SQS Endpoint</a>. * @@ -74,7 +75,9 @@ public class SqsEndpoint extends Schedul // creates a new queue, or returns the URL of an existing one CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); - request.setDefaultVisibilityTimeout(getConfiguration().getDefaultVisibilityTimeout() != null ? getConfiguration().getDefaultVisibilityTimeout() : null); + if (getConfiguration().getDefaultVisibilityTimeout() != null) { + request.setDefaultVisibilityTimeout(getConfiguration().getDefaultVisibilityTimeout()); + } LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); @@ -82,6 +85,29 @@ public class SqsEndpoint extends Schedul queueUrl = queueResult.getQueueUrl(); LOG.trace("Queue created and available at: {}", queueUrl); + + // According to the documentation, only one setting can be made at a time, even though they go into a Map. + if (getConfiguration().getMaximumMessageSize() != null) { + updateAttribute("MaximumMessageSize", getConfiguration().getMaximumMessageSize()); + } + if (getConfiguration().getMessageRetentionPeriod() != null) { + updateAttribute("MessageRetentionPeriod", getConfiguration().getMessageRetentionPeriod()); + } + if (getConfiguration().getPolicy() != null) { + updateAttribute("Policy", getConfiguration().getPolicy()); + } + } + + protected void updateAttribute(String attribute, Object value) { + SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(); + setQueueAttributesRequest.setQueueUrl(queueUrl); + setQueueAttributesRequest.getAttributes().put(attribute, String.valueOf(value)); + + LOG.trace("Updating queue [{}] with request: {}", configuration.getQueueName(), setQueueAttributesRequest); + + client.setQueueAttributes(setQueueAttributesRequest); + + LOG.trace("Queue updated"); } @Override Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java (original) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java Sun May 15 15:47:25 2011 @@ -16,26 +16,17 @@ */ package org.apache.camel.component.aws.sqs; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.CreateQueueRequest; -import com.amazonaws.services.sqs.model.CreateQueueResult; -import com.amazonaws.services.sqs.model.DeleteMessageRequest; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; -import com.amazonaws.services.sqs.model.SendMessageRequest; -import com.amazonaws.services.sqs.model.SendMessageResult; +import com.amazonaws.services.sqs.model.*; + +import java.util.*; public class AmazonSQSClientMock extends AmazonSQSClient { List<Message> messages = new ArrayList<Message>(); + Map<String, Map<String, String>> queueAttributes = new HashMap<String, Map<String, String>>(); public AmazonSQSClientMock() { super(null); @@ -89,4 +80,16 @@ public class AmazonSQSClientMock extends public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonServiceException, AmazonClientException { // noop } + + @Override + public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { + synchronized (queueAttributes) { + if (!queueAttributes.containsKey(setQueueAttributesRequest.getQueueUrl())) { + queueAttributes.put(setQueueAttributesRequest.getQueueUrl(), new HashMap<String, String>()); + } + for (final Map.Entry<String, String> entry : setQueueAttributesRequest.getAttributes().entrySet()) { + queueAttributes.get(setQueueAttributesRequest.getQueueUrl()).put(entry.getKey(), entry.getValue()); + } + } + } } \ No newline at end of file Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java (original) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java Sun May 15 15:47:25 2011 @@ -16,14 +16,14 @@ */ package org.apache.camel.component.aws.sqs; -import java.util.ArrayList; -import java.util.List; - import org.apache.camel.impl.JndiRegistry; import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + public class SqsComponentConfigurationTest extends CamelTestSupport { @Test @@ -39,6 +39,9 @@ public class SqsComponentConfigurationTe assertNull(endpoint.getConfiguration().getDefaultVisibilityTimeout()); assertNull(endpoint.getConfiguration().getVisibilityTimeout()); assertNull(endpoint.getConfiguration().getAmazonSQSEndpoint()); + assertNull(endpoint.getConfiguration().getMaximumMessageSize()); + assertNull(endpoint.getConfiguration().getMessageRetentionPeriod()); + assertNull(endpoint.getConfiguration().getPolicy()); } @Test @@ -58,6 +61,9 @@ public class SqsComponentConfigurationTe assertNull(endpoint.getConfiguration().getDefaultVisibilityTimeout()); assertNull(endpoint.getConfiguration().getVisibilityTimeout()); assertNull(endpoint.getConfiguration().getAmazonSQSEndpoint()); + assertNull(endpoint.getConfiguration().getMaximumMessageSize()); + assertNull(endpoint.getConfiguration().getMessageRetentionPeriod()); + assertNull(endpoint.getConfiguration().getPolicy()); } @Test @@ -70,7 +76,9 @@ public class SqsComponentConfigurationTe SqsComponent component = new SqsComponent(context); SqsEndpoint endpoint = (SqsEndpoint) component.createEndpoint("aws-sqs://MyQueue?amazonSQSEndpoint=sns.eu-west-1.amazonaws.com&accessKey=xxx&secretKey=yyy&attributeNames=#attributeNames" - + "&DefaultVisibilityTimeout=1000&visibilityTimeout=2000"); + + "&DefaultVisibilityTimeout=1000&visibilityTimeout=2000&maximumMessageSize=65536&messageRetentionPeriod=1209600&policy=" + + "%7B%22Version%22%3A%222008-10-17%22%2C%22Id%22%3A%22%2F195004372649%2FMyQueue%2FSQSDefaultPolicy%22%2C%22Statement%22%3A%5B%7B%22Sid%22%3A%22Queue1ReceiveMessage%22%2C%22" + + "Effect%22%3A%22Allow%22%2C%22Principal%22%3A%7B%22AWS%22%3A%22*%22%7D%2C%22Action%22%3A%22SQS%3AReceiveMessage%22%2C%22Resource%22%3A%22%2F195004372649%2FMyQueue%22%7D%5D%7D"); assertEquals("MyQueue", endpoint.getConfiguration().getQueueName()); assertEquals("xxx", endpoint.getConfiguration().getAccessKey()); @@ -80,6 +88,11 @@ public class SqsComponentConfigurationTe assertEquals(new Integer(1000), endpoint.getConfiguration().getDefaultVisibilityTimeout()); assertEquals(new Integer(2000), endpoint.getConfiguration().getVisibilityTimeout()); assertEquals("sns.eu-west-1.amazonaws.com", endpoint.getConfiguration().getAmazonSQSEndpoint()); + assertEquals(new Integer(65536), endpoint.getConfiguration().getMaximumMessageSize()); + assertEquals(new Integer(1209600), endpoint.getConfiguration().getMessageRetentionPeriod()); + assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"/195004372649/MyQueue/SQSDefaultPolicy\",\"Statement\":[{\"Sid\":\"Queue1ReceiveMessage\",\"Effect\":\"Allow\",\"Principal\":" + + "{\"AWS\":\"*\"},\"Action\":\"SQS:ReceiveMessage\",\"Resource\":\"/195004372649/MyQueue\"}]}", + endpoint.getConfiguration().getPolicy()); } @Test Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java (original) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java Sun May 15 15:47:25 2011 @@ -94,12 +94,14 @@ public class SqsComponentTest extends Ca @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { + final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&messageRetentionPeriod=%s&maximumMessageSize=%s&policy=%s", + "1209600", "65536", ""); @Override public void configure() throws Exception { from("direct:start") - .to("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"); + .to(sqsURI); - from("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient") + from(sqsURI) .to("mock:result"); } }; Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java (original) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java Sun May 15 15:47:25 2011 @@ -28,8 +28,12 @@ import org.apache.camel.test.junit4.Came import org.junit.Ignore; import org.junit.Test; +@Ignore("Must be manually tested. Provide your own accessKey and secretKey!") public class SqsComponentIntegrationTest extends CamelTestSupport { + private String accessKey = "xxx"; + private String secretKey = "yyy"; + @EndpointInject(uri = "direct:start") private ProducerTemplate template; @@ -37,7 +41,6 @@ public class SqsComponentIntegrationTest private MockEndpoint result; @Test - @Ignore("Must be manually tested. Provide your own accessKey and secretKey!") public void sendInOnly() throws Exception { result.expectedMessageCount(1); @@ -61,7 +64,6 @@ public class SqsComponentIntegrationTest } @Test - @Ignore("Must be manually tested. Provide your own accessKey and secretKey!") public void sendInOut() throws Exception { result.expectedMessageCount(1); @@ -85,13 +87,15 @@ public class SqsComponentIntegrationTest } protected RouteBuilder createRouteBuilder() throws Exception { + final String sqsURI = String.format("aws-sqs://MyQueue?accessKey=%s&secretKey=%s&messageRetentionPeriod=%s&maximumMessageSize=%s&policy=%s", + accessKey, secretKey, "1209600", "65536", ""); return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") - .to("aws-sqs://MyQueue?accessKey=xxx&secretKey=yyy"); + .to(sqsURI); - from("aws-sqs://MyQueue?accessKey=xxx&secretKey=yyy") + from(sqsURI) .to("mock:result"); } }; Modified: camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml?rev=1103434&r1=1103433&r2=1103434&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml (original) +++ camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml Sun May 15 15:47:25 2011 @@ -23,11 +23,11 @@ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> - <to uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"/> + <to uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&messageRetentionPeriod=1209600&maximumMessageSize=65536&policy="/> </route> <route> - <from uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"/> + <from uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&messageRetentionPeriod=1209600&maximumMessageSize=65536&policy=""/> <to uri="mock:result"/> </route> </camelContext>