This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.8.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push: new 78752e30390 CAMEL-21550: start the extender before visibility (#16592) 78752e30390 is described below commit 78752e30390679108955a80f84abcc2eafc1bcd9 Author: Narsi Nallamilli <narsi.nallami...@gmail.com> AuthorDate: Wed Dec 18 14:40:26 2024 +0530 CAMEL-21550: start the extender before visibility (#16592) --- .../camel/component/aws2/sqs/Sqs2Consumer.java | 36 +++++++--------------- .../SqsConsumerExtendMessageVisibilityTest.java | 2 +- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 4637725c20a..c170a2c75fa 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -66,17 +66,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageNotInflightException; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; -import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException; -import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; -import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; -import software.amazon.awssdk.services.sqs.model.SqsException; +import software.amazon.awssdk.services.sqs.model.*; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; @@ -311,17 +301,18 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); if (visibilityTimeout != null && visibilityTimeout > 0) { - int delay = visibilityTimeout; + int initialDelay = visibilityTimeout / 2; + int period = visibilityTimeout; int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5); this.timeoutExtender = new TimeoutExtender(repeatSeconds); if (LOG.isDebugEnabled()) { LOG.debug( "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)", - delay, delay, repeatSeconds); + initialDelay, period, repeatSeconds); } this.scheduledFuture - = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS); + = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, initialDelay, period, TimeUnit.SECONDS); } } @@ -415,18 +406,13 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { try { LOG.trace("Extending visibility window by {} seconds for request entries {}", repeatSeconds, batchEntries); - getEndpoint().getClient().changeMessageVisibilityBatch(request); - LOG.debug("Extended visibility window for request entries {}", batchEntries); - } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) { - // Ignore. + ChangeMessageVisibilityBatchResponse br + = getEndpoint().getClient().changeMessageVisibilityBatch(request); + LOG.debug("Extended visibility window for request entries successful {}", br.successful()); + LOG.debug("Extended visibility window for request entries failed {}", br.failed()); } catch (SqsException e) { - if (e.getMessage() - .contains("Message does not exist or is not available for visibility timeout change")) { - // Ignore. - } else { - logException(e, batchEntries); - } - } catch (Exception e) { + logException(e, batchEntries); + } catch (SdkException e) { logException(e, batchEntries); } } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java index 282ed85e76f..1c2d2aea695 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java @@ -46,7 +46,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport { @Override public void process(Exchange exchange) throws Exception { // Simulate message that takes a while to receive. - Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT. + Thread.sleep(TIMEOUT * 3000L); // 150% of TIMEOUT. } });