This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 3fd3518 CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582) 3fd3518 is described below commit 3fd35185e74ea5b0a2104182b4e8d1a0956c8d14 Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com> AuthorDate: Mon Nov 9 16:46:24 2020 +0100 CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582) --- .../camel/component/aws2/sqs/Sqs2Endpoint.java | 43 ++++++++++++++++++++-- .../component/aws2/sqs/AmazonSQSClientMock.java | 14 +++++++ 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java index 8ceef1b..1282b4b 100644 --- a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java +++ b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java @@ -49,7 +49,9 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; +import software.amazon.awssdk.services.sqs.model.SqsException; /** * Sending and receive messages to/from AWS SQS service using AWS SDK version 2.x. @@ -189,8 +191,32 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS } } + private boolean queueExists(SqsClient client) { + LOG.trace("Checking if queue '{}' exists", configuration.getQueueName()); + + GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder() + .queueName(configuration.getQueueName()) + .build(); + try { + queueUrl = client.getQueueUrl(getQueueUrlRequest).queueUrl(); + LOG.trace("Queue '{}' exists and its URL is '{}'", configuration.getQueueName(), + queueUrl); + + return true; + + } catch (QueueDoesNotExistException e) { + LOG.trace("Queue '{}' does not exist", configuration.getQueueName()); + + return false; + } + } + protected void createQueue(SqsClient client) { - LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); + if (queueExists(client)) { + return; + } + + LOG.trace("Creating the a queue named '{}'", configuration.getQueueName()); // creates a new queue, or returns the URL of an existing one CreateQueueRequest.Builder request = CreateQueueRequest.builder().queueName(configuration.getQueueName()); @@ -234,11 +260,20 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds())); } } - LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); + LOG.trace("Trying to create queue [{}] with request [{}]...", configuration.getQueueName(), request); request.attributes(attributes); - CreateQueueResponse queueResult = client.createQueue(request.build()); - queueUrl = queueResult.queueUrl(); + try { + CreateQueueResponse queueResult = client.createQueue(request.build()); + queueUrl = queueResult.queueUrl(); + } catch (SqsException e) { + if (queueExists(client)) { + LOG.warn("The queue may have been created since last check and could not be created"); + LOG.debug("AWS SDK error preventing queue creation: {}", e.getMessage(), e); + } else { + throw e; + } + } LOG.trace("Queue created and available at: {}", queueUrl); } diff --git a/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java index f0c3725..2be4289 100644 --- a/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java +++ b/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java @@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; @@ -36,11 +38,14 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.ListQueuesRequest; import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; import software.amazon.awssdk.services.sqs.model.PurgeQueueResponse; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; @@ -50,6 +55,7 @@ import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesResponse; +import software.amazon.awssdk.services.sqs.model.SqsException; public class AmazonSQSClientMock implements SqsClient { @@ -235,4 +241,12 @@ public class AmazonSQSClientMock implements SqsClient { // TODO Auto-generated method stub } + + @Override + public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) + throws QueueDoesNotExistException, AwsServiceException, SdkClientException, SqsException { + return GetQueueUrlResponse.builder() + .queueUrl("https://queue.amazonaws.com/queue/camel-836") + .build(); + } }