Author: cmueller Date: Wed Sep 5 19:57:23 2012 New Revision: 1381328 URL: http://svn.apache.org/viewvc?rev=1381328&view=rev Log: CAMEL-5414: SqsEndpoint can't retrieve existing queue url with visibility timeout different than default
Added: camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Modified: camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java Modified: camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1381328&r1=1381327&r2=1381328&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original) +++ camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Wed Sep 5 19:57:23 2012 @@ -23,6 +23,7 @@ import com.amazonaws.auth.BasicAWSCreden import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.ListQueuesResult; import com.amazonaws.services.sqs.model.QueueAttributeName; import org.apache.camel.Consumer; @@ -74,27 +75,41 @@ public class SqsEndpoint extends Schedul protected void doStart() throws Exception { client = getConfiguration().getAmazonSQSClient() != null ? getConfiguration().getAmazonSQSClient() : getClient(); - - // creates a new queue, or returns the URL of an existing one - CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); - if (getConfiguration().getDefaultVisibilityTimeout() != null) { - request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); - } - if (getConfiguration().getMaximumMessageSize() != null) { - request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); - } - if (getConfiguration().getMessageRetentionPeriod() != null) { - request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); + + // check whether the queue already exists + ListQueuesResult listQueuesResult = client.listQueues(); + for (String url : listQueuesResult.getQueueUrls()) { + if (url.endsWith("/" + configuration.getQueueName())) { + queueUrl = url; + LOG.trace("Queue available at '{}'. Using existing queue attributes!", queueUrl); + break; + } } - if (getConfiguration().getPolicy() != null) { - request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); - } - LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); - CreateQueueResult queueResult = client.createQueue(request); - queueUrl = queueResult.getQueueUrl(); - - LOG.trace("Queue created and available at: {}", queueUrl); + if (queueUrl == null) { + LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); + + // creates a new queue, or returns the URL of an existing one + CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); + if (getConfiguration().getDefaultVisibilityTimeout() != null) { + request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); + } + if (getConfiguration().getMaximumMessageSize() != null) { + request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); + } + if (getConfiguration().getMessageRetentionPeriod() != null) { + request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); + } + if (getConfiguration().getPolicy() != null) { + request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); + } + LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); + + CreateQueueResult queueResult = client.createQueue(request); + queueUrl = queueResult.getQueueUrl(); + + LOG.trace("Queue created and available at: {}", queueUrl); + } } @Override Modified: camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java?rev=1381328&r1=1381327&r2=1381328&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java (original) +++ camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java Wed Sep 5 19:57:23 2012 @@ -31,6 +31,7 @@ import com.amazonaws.services.sqs.model. import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueResult; import com.amazonaws.services.sqs.model.DeleteMessageRequest; +import com.amazonaws.services.sqs.model.ListQueuesResult; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; @@ -49,6 +50,12 @@ public class AmazonSQSClientMock extends } @Override + public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException { + ListQueuesResult result = new ListQueuesResult(); + return result; + } + + @Override public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException { CreateQueueResult result = new CreateQueueResult(); result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue"); Added: camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java?rev=1381328&view=auto ============================================================================== --- camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java (added) +++ camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Wed Sep 5 19:57:23 2012 @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.sqs; + +import java.util.List; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.ListQueuesResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class SqsEndpointUseExistingQueueTest extends CamelTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mock; + + @Test + public void defaultsToDisabled() throws Exception { + this.mock.expectedMessageCount(1); + + assertMockEndpointsSatisfied(); // Wait for message to arrive. + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + AmazonSQSClientMock clientMock = new SqsEndpointUseExistingQueueTest.AmazonSQSClientMock(); + registry.bind("amazonSQSClient", clientMock); + + return registry; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient") + .to("mock:result"); + } + }; + } + + static class AmazonSQSClientMock extends AmazonSQSClient { + + public AmazonSQSClientMock() { + super(new BasicAWSCredentials("myAccessKey", "mySecretKey")); + } + + @Override + public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException { + ListQueuesResult result = new ListQueuesResult(); + result.getQueueUrls().add("http://queue.amazonaws.com/0815/Foo"); + result.getQueueUrls().add("http://queue.amazonaws.com/0815/MyQueue"); + result.getQueueUrls().add("http://queue.amazonaws.com/0815/Bar"); + return result; + } + + @Override + public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException { + throw new AmazonServiceException("forced exception for test if this method is called"); + } + + @Override + public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException { + ReceiveMessageResult result = new ReceiveMessageResult(); + List<Message> resultMessages = result.getMessages(); + Message message = new Message(); + resultMessages.add(message); + + return result; + } + } +}