This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 9a29ad738c5 CAMEL-19656 batch visibility extender task (#11102) 9a29ad738c5 is described below commit 9a29ad738c5514d1ea9dc8975b5ab353333eac21 Author: Jono Morris <jono.mor...@xtra.co.nz> AuthorDate: Fri Aug 18 00:59:21 2023 +1200 CAMEL-19656 batch visibility extender task (#11102) * CAMEL-19656 batch visibility extender task * CAMEL-19656 run extender as single background task * CAMEL-19656 new list instance for each request * CAMEL-19656 remove exchange from extender when complete * CAMEL-19656 update logging collection reference --- .../camel/component/aws2/sqs/Sqs2Consumer.java | 155 +++++++++++++-------- .../component/aws2/sqs/AmazonSQSClientMock.java | 16 +-- .../SqsConsumerExtendMessageVisibilityTest.java | 4 +- .../sqs/SqsDoesNotExtendMessageVisibilityTest.java | 2 +- 4 files changed, 105 insertions(+), 72 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 4515323cca3..ee037337e5e 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -47,7 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.MessageNotInflightException; @@ -66,6 +68,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(Sqs2Consumer.class); + private TimeoutExtender timeoutExtender; + private ScheduledFuture<?> scheduledFuture; private ScheduledExecutorService scheduledExecutor; private transient String sqsConsumerToString; private Collection<String> attributeNames; @@ -169,44 +173,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { // update pending number of exchanges pendingExchanges = total - index - 1; - // schedule task to extend visibility if enabled - Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); - if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) { - int delay = visibilityTimeout.intValue() / 2; - int period = visibilityTimeout.intValue(); - int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}", - delay, period, - repeatSeconds, exchange.getExchangeId()); - } - final TimeoutExtender extender = new TimeoutExtender(exchange, repeatSeconds); - final ScheduledFuture<?> scheduledFuture - = this.scheduledExecutor.scheduleAtFixedRate(extender, delay, period, TimeUnit.SECONDS); - exchange.getExchangeExtension().addOnCompletion(new Synchronization() { - @Override - public void onComplete(Exchange exchange) { - cancelExtender(exchange); - } - - @Override - public void onFailure(Exchange exchange) { - cancelExtender(exchange); - } - - private void cancelExtender(Exchange exchange) { - // cancel task as we are done - LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", - exchange.getExchangeId()); - extender.cancel(); - boolean cancelled = scheduledFuture.cancel(true); - if (!cancelled) { - LOG.warn("TimeoutExtender task for exchangeId: {} could not be cancelled", - exchange.getExchangeId()); - } - } - }); + if (this.timeoutExtender != null) { + timeoutExtender.add(exchange); } // add on completion to handle after work when the exchange is done @@ -370,6 +338,22 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, "SqsTimeoutExtender", profile); + + Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); + + if (visibilityTimeout != null && visibilityTimeout > 0) { + int delay = visibilityTimeout; + int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5); + this.timeoutExtender = new TimeoutExtender(repeatSeconds); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)", + delay, delay, repeatSeconds); + } + this.scheduledFuture + = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS); + } } super.doStart(); @@ -377,6 +361,16 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { @Override protected void doShutdown() throws Exception { + if (timeoutExtender != null) { + timeoutExtender.cancel(); + timeoutExtender = null; + } + + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + if (scheduledExecutor != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor); scheduledExecutor = null; @@ -387,15 +381,43 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { private class TimeoutExtender implements Runnable { - private final Exchange exchange; + private static final int MAX_REQUESTS = 10; private final int repeatSeconds; private final AtomicBoolean run = new AtomicBoolean(true); + private final Map<String, ChangeMessageVisibilityBatchRequestEntry> entries = new ConcurrentHashMap<>(); - TimeoutExtender(Exchange exchange, int repeatSeconds) { - this.exchange = exchange; + TimeoutExtender(int repeatSeconds) { this.repeatSeconds = repeatSeconds; } + public void add(Exchange exchange) { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + remove(exchange); + } + + @Override + public void onFailure(Exchange exchange) { + remove(exchange); + } + + private void remove(Exchange exchange) { + LOG.trace("Removing exchangeId {} from the TimeoutExtender, processing done", + exchange.getExchangeId()); + entries.remove(exchange.getExchangeId()); + } + }); + + ChangeMessageVisibilityBatchRequestEntry entry + = ChangeMessageVisibilityBatchRequestEntry.builder() + .id(exchange.getExchangeId()).visibilityTimeout(repeatSeconds) + .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class)) + .build(); + + entries.put(exchange.getExchangeId(), entry); + } + public void cancel() { // cancel by setting to no longer run run.set(false); @@ -404,32 +426,43 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { @Override public void run() { if (run.get()) { - ChangeMessageVisibilityRequest.Builder request - = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds) - .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class)); - - try { - LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange); - getEndpoint().getClient().changeMessageVisibility(request.build()); - LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange); - } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) { - // Ignore. - } catch (SqsException e) { - if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) { + Queue<ChangeMessageVisibilityBatchRequestEntry> entryQueue = new LinkedList<>(entries.values()); + + while (!entryQueue.isEmpty()) { + List<ChangeMessageVisibilityBatchRequestEntry> batchEntries = new LinkedList<>(); + // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action + while (!entryQueue.isEmpty() && batchEntries.size() < MAX_REQUESTS) { + batchEntries.add(entryQueue.poll()); + } + + ChangeMessageVisibilityBatchRequest request + = ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(batchEntries) + .build(); + + try { + LOG.trace("Extending visibility window by {} seconds for request entries {}", repeatSeconds, batchEntries); + getEndpoint().getClient().changeMessageVisibilityBatch(request); + LOG.debug("Extended visibility window for request entries {}", batchEntries); + } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) { // Ignore. - } else { - logException(e); + } catch (SqsException e) { + if (e.getMessage() + .contains("Message does not exist or is not available for visibility timeout change")) { + // Ignore. + } else { + logException(e, batchEntries); + } + } catch (Exception e) { + logException(e, batchEntries); } - } catch (Exception e) { - logException(e); } } } - private void logException(Exception e) { - LOG.warn("Extending visibility window failed for exchange {}" + private void logException(Exception e, List<ChangeMessageVisibilityBatchRequestEntry> entries) { + LOG.warn("Extending visibility window failed for entries {}" + ". Will not attempt to extend visibility further. This exception will be ignored.", - exchange, e); + entries, e); } } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java index 3d68ed19002..7d3affb8cf7 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java @@ -33,8 +33,8 @@ import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.SqsServiceClientConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; @@ -63,7 +63,7 @@ public class AmazonSQSClientMock implements SqsClient { List<Message> messages = new ArrayList<>(); Map<String, Map<String, String>> queueAttributes = new HashMap<>(); - List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<>(); + List<ChangeMessageVisibilityBatchRequest> changeMessageVisibilityBatchRequests = new CopyOnWriteArrayList<>(); private Map<String, CreateQueueRequest> queues = new LinkedHashMap<>(); private Map<String, ScheduledFuture<?>> inFlight = new LinkedHashMap<>(); private ScheduledExecutorService scheduler; @@ -232,12 +232,12 @@ public class AmazonSQSClientMock implements SqsClient { public SqsServiceClientConfiguration serviceClientConfiguration() { return null; } - + @Override - public ChangeMessageVisibilityResponse changeMessageVisibility( - ChangeMessageVisibilityRequest changeMessageVisibilityRequest) { - this.changeMessageVisibilityRequests.add(changeMessageVisibilityRequest); - return ChangeMessageVisibilityResponse.builder().build(); + public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch( + ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) { + this.changeMessageVisibilityBatchRequests.add(changeMessageVisibilityBatchRequest); + return ChangeMessageVisibilityBatchResponse.builder().build(); } @Override diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java index e972cd3524c..1ecb28ae27d 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java @@ -60,8 +60,8 @@ public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport { // Wait for message to arrive. MockEndpoint.assertIsSatisfied(context); - assertTrue(this.client.changeMessageVisibilityRequests.size() >= 1); - assertTrue(this.client.changeMessageVisibilityRequests.size() <= 3); + assertTrue(this.client.changeMessageVisibilityBatchRequests.size() >= 1); + assertTrue(this.client.changeMessageVisibilityBatchRequests.size() <= 3); } @Override diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java index 1b5414bfa95..83b7b61f9d6 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java @@ -59,7 +59,7 @@ public class SqsDoesNotExtendMessageVisibilityTest extends CamelTestSupport { // Wait for message to arrive. MockEndpoint.assertIsSatisfied(context); - assertEquals(0, this.client.changeMessageVisibilityRequests.size()); + assertEquals(0, this.client.changeMessageVisibilityBatchRequests.size()); } @Override