This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push: new a99a781b716 CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue. a99a781b716 is described below commit a99a781b7169d1263ec680779b83fbd721c24c15 Author: JiriOndrusek <ondrusek.j...@gmail.com> AuthorDate: Mon Jan 23 13:57:27 2023 +0100 CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue. --- .../camel/component/aws2/sqs/Sqs2Endpoint.java | 69 ++++++++++++------- .../component/aws2/sqs/AmazonSQSClientMock.java | 16 ++++- .../aws2/sqs/SqsProducerDelayedQueueuTest.java | 79 ++++++++++++++++++++++ 3 files changed, 136 insertions(+), 28 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java index 806216e3481..a51cb819492 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java @@ -65,6 +65,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS private SqsClient client; private String queueUrl; + private boolean queueUrlInitialized; @UriPath(description = "Queue name or ARN") @Metadata(required = true) @@ -151,6 +152,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS if (configuration.getQueueUrl() != null) { queueUrl = configuration.getQueueUrl(); + queueUrlInitialized = true; } else { // If both region and Account ID is provided the queue URL can be // built manually. @@ -159,39 +161,16 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS if (configuration.getRegion() != null && configuration.getQueueOwnerAWSAccountId() != null) { queueUrl = getAwsEndpointUri() + "/" + configuration.getQueueOwnerAWSAccountId() + "/" + configuration.getQueueName(); + queueUrlInitialized = true; } else if (configuration.getQueueOwnerAWSAccountId() != null) { GetQueueUrlRequest.Builder getQueueUrlRequest = GetQueueUrlRequest.builder(); getQueueUrlRequest.queueName(configuration.getQueueName()); getQueueUrlRequest.queueOwnerAWSAccountId(configuration.getQueueOwnerAWSAccountId()); GetQueueUrlResponse getQueueUrlResult = client.getQueueUrl(getQueueUrlRequest.build()); queueUrl = getQueueUrlResult.queueUrl(); + queueUrlInitialized = true; } else { - // check whether the queue already exists - String queueNamePath = "/" + configuration.getQueueName(); - ListQueuesRequest.Builder listQueuesRequestBuilder - = ListQueuesRequest.builder().maxResults(1000).queueNamePrefix(configuration.getQueueName()); - - for (;;) { - ListQueuesResponse listQueuesResult = client.listQueues(listQueuesRequestBuilder.build()); - for (String url : listQueuesResult.queueUrls()) { - if (url.endsWith(queueNamePath)) { - queueUrl = url; - LOG.trace("Queue available at '{}'.", queueUrl); - break; - } - } - - if (queueUrl != null) { - break; - } - - String token = listQueuesResult.nextToken(); - if (token == null) { - break; - } - - listQueuesRequestBuilder = listQueuesRequestBuilder.nextToken(token); - } + initQueueUrl(); } } @@ -203,6 +182,36 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS } } + private void initQueueUrl() { + // check whether the queue already exists + String queueNamePath = "/" + configuration.getQueueName(); + ListQueuesRequest.Builder listQueuesRequestBuilder + = ListQueuesRequest.builder().maxResults(1000).queueNamePrefix(configuration.getQueueName()); + + for (;;) { + ListQueuesResponse listQueuesResult = client.listQueues(listQueuesRequestBuilder.build()); + for (String url : listQueuesResult.queueUrls()) { + if (url.endsWith(queueNamePath)) { + queueUrl = url; + LOG.trace("Queue available at '{}'.", queueUrl); + break; + } + } + + if (queueUrl != null) { + queueUrlInitialized = true; + break; + } + + String token = listQueuesResult.nextToken(); + if (token == null) { + break; + } + + listQueuesRequestBuilder = listQueuesRequestBuilder.nextToken(token); + } + } + private boolean queueExists(SqsClient client) { LOG.trace("Checking if queue '{}' exists", configuration.getQueueName()); @@ -366,7 +375,15 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS this.client = client; } + /** + * If queue does not exist during endpoint initialization, the queueUrl has to be initialized again. See + * https://issues.apache.org/jira/browse/CAMEL-18968 for more details. + */ protected String getQueueUrl() { + if (!queueUrlInitialized) { + LOG.trace("Queue url was not initialized during the start of the component. Initializing again."); + initQueueUrl(); + } return queueUrl; } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java index c50ea3b10a1..1a06a45bff7 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java @@ -68,6 +68,7 @@ public class AmazonSQSClientMock implements SqsClient { private Map<String, ScheduledFuture<?>> inFlight = new LinkedHashMap<>(); private ScheduledExecutorService scheduler; private String queueName; + private boolean verifyQueueUrl; public AmazonSQSClientMock() { } @@ -88,8 +89,8 @@ public class AmazonSQSClientMock implements SqsClient { if (queueName != null) { queues.add("/" + queueName); } else { - queues.add("queue1"); - queues.add("queue2"); + queues.add("/queue1"); + queues.add("/queue2"); } result.queueUrls(queues); return result.build(); @@ -106,6 +107,9 @@ public class AmazonSQSClientMock implements SqsClient { @Override public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) { + if (verifyQueueUrl && sendMessageRequest.queueUrl() == null) { + throw new RuntimeException("QueueUrl can not be null."); + } Message.Builder message = Message.builder(); message.body(sendMessageRequest.messageBody()); message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); @@ -272,4 +276,12 @@ public class AmazonSQSClientMock implements SqsClient { .queueUrl("https://queue.amazonaws.com/queue/camel-836") .build(); } + + public void setVerifyQueueUrl(boolean verifyQueueUrl) { + this.verifyQueueUrl = verifyQueueUrl; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java new file mode 100644 index 00000000000..ae821b9e016 --- /dev/null +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.sqs; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SqsProducerDelayedQueueuTest extends CamelTestSupport { + + @BindToRegistry("client") + AmazonSQSClientMock mock = new AmazonSQSClientMock(); + + @EndpointInject("direct:start") + private ProducerTemplate template; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @Test + public void testDelayed() throws Exception { + mock.setVerifyQueueUrl(true); + result.expectedMessageCount(1); + + //should fail, because queue3 is not registered in client + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) { + exchange.getIn().setBody("Hi from sqs 1"); + } + }); + //adding queue3 later (delayed) + mock.setQueueName("queue3"); + + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) { + exchange.getIn().setBody("Hi from sqs 2"); + } + }); + + MockEndpoint.assertIsSatisfied(context); + String res = result.getExchanges().get(0).getIn().getBody(String.class); + assertEquals("Hi from sqs 2", res); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("aws2-sqs://queue3") + .to("mock:result"); + } + }; + } +}