Author: cmueller
Date: Sun May 15 15:47:25 2011
New Revision: 1103434

URL: http://svn.apache.org/viewvc?rev=1103434&view=rev
Log:
CAMEL-3869: Patch for Camel AWS SQS endpoint to set maximum message size and 
message retention period - work in progress

Modified:
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
    
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
    
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
    
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java
    
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java
    
camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
 Sun May 15 15:47:25 2011
@@ -16,10 +16,10 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-import java.util.Collection;
-
 import com.amazonaws.services.sqs.AmazonSQSClient;
 
+import java.util.Collection;
+
 /**
  * The AWS SQS component configuration properties
  * 
@@ -39,6 +39,11 @@ public class SqsConfiguration {
     private Collection<String> attributeNames;
     private Integer defaultVisibilityTimeout;
     
+    // queue properties
+    private Integer maximumMessageSize;
+    private Integer messageRetentionPeriod;
+    private String policy;
+    
     public void setAmazonSQSEndpoint(String amazonSQSEndpoint) {
         this.amazonSQSEndpoint = amazonSQSEndpoint;
     }
@@ -111,6 +116,30 @@ public class SqsConfiguration {
         this.defaultVisibilityTimeout = defaultVisibilityTimeout;
     }
 
+    public Integer getMaximumMessageSize() {
+        return maximumMessageSize;
+    }
+
+    public void setMaximumMessageSize(Integer maximumMessageSize) {
+        this.maximumMessageSize = maximumMessageSize;
+    }
+
+    public Integer getMessageRetentionPeriod() {
+        return messageRetentionPeriod;
+    }
+
+    public void setMessageRetentionPeriod(Integer messageRetentionPeriod) {
+        this.messageRetentionPeriod = messageRetentionPeriod;
+    }
+    
+    public String getPolicy() {
+        return policy;
+    }
+
+    public void setPolicy(String policy) {
+        this.policy = policy;
+    }
+
     @Override
     public String toString() {
         return "SqsConfiguration[queueName=" + queueName
@@ -121,6 +150,9 @@ public class SqsConfiguration {
             + ", visibilityTimeout=" + visibilityTimeout
             + ", attributeNames=" + attributeNames
             + ", defaultVisibilityTimeout=" + defaultVisibilityTimeout
+            + ", maximumMessageSize=" + maximumMessageSize
+            + ", messageRetentionPeriod=" + messageRetentionPeriod
+            + ", policy=" + policy
             + "]";
     }
 }
\ No newline at end of file

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 Sun May 15 15:47:25 2011
@@ -16,13 +16,12 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-import java.util.HashMap;
-
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.CreateQueueRequest;
 import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -35,6 +34,8 @@ import org.apache.camel.impl.ScheduledPo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+
 /**
  * Defines the <a href="http://camel.apache.org/aws.html";>AWS SQS 
Endpoint</a>.  
  *
@@ -74,7 +75,9 @@ public class SqsEndpoint extends Schedul
         
         // creates a new queue, or returns the URL of an existing one
         CreateQueueRequest request = new 
CreateQueueRequest(configuration.getQueueName());
-        
request.setDefaultVisibilityTimeout(getConfiguration().getDefaultVisibilityTimeout()
 != null ? getConfiguration().getDefaultVisibilityTimeout() : null);
+        if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+            
request.setDefaultVisibilityTimeout(getConfiguration().getDefaultVisibilityTimeout());
+        }
         
         LOG.trace("Creating queue [{}] with request [{}]...", 
configuration.getQueueName(), request);
         
@@ -82,6 +85,29 @@ public class SqsEndpoint extends Schedul
         queueUrl = queueResult.getQueueUrl();
         
         LOG.trace("Queue created and available at: {}", queueUrl);
+
+        // According to the documentation, only one setting can be made at a 
time, even though they go into a Map.
+        if (getConfiguration().getMaximumMessageSize() != null) {
+            updateAttribute("MaximumMessageSize", 
getConfiguration().getMaximumMessageSize());
+        }
+        if (getConfiguration().getMessageRetentionPeriod() != null) {
+            updateAttribute("MessageRetentionPeriod", 
getConfiguration().getMessageRetentionPeriod());
+        }
+        if (getConfiguration().getPolicy() != null) {
+            updateAttribute("Policy", getConfiguration().getPolicy());
+        }
+    }
+
+    protected void updateAttribute(String attribute, Object value) {
+        SetQueueAttributesRequest setQueueAttributesRequest = new 
SetQueueAttributesRequest();
+        setQueueAttributesRequest.setQueueUrl(queueUrl);
+        setQueueAttributesRequest.getAttributes().put(attribute, 
String.valueOf(value));
+        
+        LOG.trace("Updating queue [{}] with request: {}", 
configuration.getQueueName(), setQueueAttributesRequest);
+        
+        client.setQueueAttributes(setQueueAttributesRequest);
+        
+        LOG.trace("Queue updated");
     }
 
     @Override

Modified: 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
 Sun May 15 15:47:25 2011
@@ -16,26 +16,17 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
-import com.amazonaws.services.sqs.model.SendMessageResult;
+import com.amazonaws.services.sqs.model.*;
+
+import java.util.*;
 
 public class AmazonSQSClientMock extends AmazonSQSClient {
     
     List<Message> messages = new ArrayList<Message>();
+    Map<String, Map<String, String>> queueAttributes = new HashMap<String, 
Map<String, String>>();
     
     public AmazonSQSClientMock() {
         super(null);
@@ -89,4 +80,16 @@ public class AmazonSQSClientMock extends
     public void deleteMessage(DeleteMessageRequest deleteMessageRequest) 
throws AmazonServiceException, AmazonClientException {
         // noop
     }
+
+    @Override
+    public void setQueueAttributes(SetQueueAttributesRequest 
setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException 
{
+        synchronized (queueAttributes) {
+            if 
(!queueAttributes.containsKey(setQueueAttributesRequest.getQueueUrl())) {
+                queueAttributes.put(setQueueAttributesRequest.getQueueUrl(), 
new HashMap<String, String>());
+            }
+            for (final Map.Entry<String, String> entry : 
setQueueAttributesRequest.getAttributes().entrySet()) {
+                
queueAttributes.get(setQueueAttributesRequest.getQueueUrl()).put(entry.getKey(),
 entry.getValue());
+            }
+        }
+    }
 }
\ No newline at end of file

Modified: 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
 Sun May 15 15:47:25 2011
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class SqsComponentConfigurationTest extends CamelTestSupport {
     
     @Test
@@ -39,6 +39,9 @@ public class SqsComponentConfigurationTe
         assertNull(endpoint.getConfiguration().getDefaultVisibilityTimeout());
         assertNull(endpoint.getConfiguration().getVisibilityTimeout());
         assertNull(endpoint.getConfiguration().getAmazonSQSEndpoint());
+        assertNull(endpoint.getConfiguration().getMaximumMessageSize());
+        assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
+        assertNull(endpoint.getConfiguration().getPolicy());
     }
     
     @Test
@@ -58,6 +61,9 @@ public class SqsComponentConfigurationTe
         assertNull(endpoint.getConfiguration().getDefaultVisibilityTimeout());
         assertNull(endpoint.getConfiguration().getVisibilityTimeout());
         assertNull(endpoint.getConfiguration().getAmazonSQSEndpoint());
+        assertNull(endpoint.getConfiguration().getMaximumMessageSize());
+        assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
+        assertNull(endpoint.getConfiguration().getPolicy());
     }
     
     @Test
@@ -70,7 +76,9 @@ public class SqsComponentConfigurationTe
         
         SqsComponent component = new SqsComponent(context);
         SqsEndpoint endpoint = (SqsEndpoint) 
component.createEndpoint("aws-sqs://MyQueue?amazonSQSEndpoint=sns.eu-west-1.amazonaws.com&accessKey=xxx&secretKey=yyy&attributeNames=#attributeNames"
-                + "&DefaultVisibilityTimeout=1000&visibilityTimeout=2000");
+                + 
"&DefaultVisibilityTimeout=1000&visibilityTimeout=2000&maximumMessageSize=65536&messageRetentionPeriod=1209600&policy="
+                + 
"%7B%22Version%22%3A%222008-10-17%22%2C%22Id%22%3A%22%2F195004372649%2FMyQueue%2FSQSDefaultPolicy%22%2C%22Statement%22%3A%5B%7B%22Sid%22%3A%22Queue1ReceiveMessage%22%2C%22"
+                + 
"Effect%22%3A%22Allow%22%2C%22Principal%22%3A%7B%22AWS%22%3A%22*%22%7D%2C%22Action%22%3A%22SQS%3AReceiveMessage%22%2C%22Resource%22%3A%22%2F195004372649%2FMyQueue%22%7D%5D%7D");
         
         assertEquals("MyQueue", endpoint.getConfiguration().getQueueName());
         assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
@@ -80,6 +88,11 @@ public class SqsComponentConfigurationTe
         assertEquals(new Integer(1000), 
endpoint.getConfiguration().getDefaultVisibilityTimeout());
         assertEquals(new Integer(2000), 
endpoint.getConfiguration().getVisibilityTimeout());
         assertEquals("sns.eu-west-1.amazonaws.com", 
endpoint.getConfiguration().getAmazonSQSEndpoint());
+        assertEquals(new Integer(65536), 
endpoint.getConfiguration().getMaximumMessageSize());
+        assertEquals(new Integer(1209600), 
endpoint.getConfiguration().getMessageRetentionPeriod());
+        
assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"/195004372649/MyQueue/SQSDefaultPolicy\",\"Statement\":[{\"Sid\":\"Queue1ReceiveMessage\",\"Effect\":\"Allow\",\"Principal\":"
+                + 
"{\"AWS\":\"*\"},\"Action\":\"SQS:ReceiveMessage\",\"Resource\":\"/195004372649/MyQueue\"}]}",
+                endpoint.getConfiguration().getPolicy());
     }
     
     @Test

Modified: 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentTest.java
 Sun May 15 15:47:25 2011
@@ -94,12 +94,14 @@ public class SqsComponentTest extends Ca
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
+            final String sqsURI = 
String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&messageRetentionPeriod=%s&maximumMessageSize=%s&policy=%s",
+                    "1209600", "65536", "");
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .to("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient");
+                    .to(sqsURI);
                 
-                from("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient")
+                from(sqsURI)
                     .to("mock:result");
             }
         };

Modified: 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java
 Sun May 15 15:47:25 2011
@@ -28,8 +28,12 @@ import org.apache.camel.test.junit4.Came
 import org.junit.Ignore;
 import org.junit.Test;
 
+@Ignore("Must be manually tested. Provide your own accessKey and secretKey!")
 public class SqsComponentIntegrationTest extends CamelTestSupport {
     
+    private String accessKey = "xxx";
+    private String secretKey = "yyy";
+    
     @EndpointInject(uri = "direct:start")
     private ProducerTemplate template;
     
@@ -37,7 +41,6 @@ public class SqsComponentIntegrationTest
     private MockEndpoint result;
     
     @Test
-    @Ignore("Must be manually tested. Provide your own accessKey and 
secretKey!")
     public void sendInOnly() throws Exception {
         result.expectedMessageCount(1);
         
@@ -61,7 +64,6 @@ public class SqsComponentIntegrationTest
     }
     
     @Test
-    @Ignore("Must be manually tested. Provide your own accessKey and 
secretKey!")
     public void sendInOut() throws Exception {
         result.expectedMessageCount(1);
         
@@ -85,13 +87,15 @@ public class SqsComponentIntegrationTest
     }
     
     protected RouteBuilder createRouteBuilder() throws Exception {
+        final String sqsURI = 
String.format("aws-sqs://MyQueue?accessKey=%s&secretKey=%s&messageRetentionPeriod=%s&maximumMessageSize=%s&policy=%s",
+                accessKey, secretKey, "1209600", "65536", "");
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .to("aws-sqs://MyQueue?accessKey=xxx&secretKey=yyy");
+                    .to(sqsURI);
                 
-                from("aws-sqs://MyQueue?accessKey=xxx&secretKey=yyy")
+                from(sqsURI)
                     .to("mock:result");
             }
         };

Modified: 
camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml?rev=1103434&r1=1103433&r2=1103434&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml
 (original)
+++ 
camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/sqs/SqsComponentSpringTest-context.xml
 Sun May 15 15:47:25 2011
@@ -23,11 +23,11 @@
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
         <route>
             <from uri="direct:start"/>
-            <to uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"/>
+            <to 
uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&messageRetentionPeriod=1209600&maximumMessageSize=65536&policy="/>
         </route>
 
         <route>
-            <from uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"/>
+            <from 
uri="aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&messageRetentionPeriod=1209600&maximumMessageSize=65536&policy=""/>
             <to uri="mock:result"/>
         </route>
     </camelContext>


Reply via email to