This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push: new 945bb0a8d83 CAMEL-21550: start the extender before visibility (#16592) 945bb0a8d83 is described below commit 945bb0a8d838e98a7016cf423d82c340d911528f Author: Narsi Nallamilli <narsi.nallami...@gmail.com> AuthorDate: Wed Dec 18 14:40:26 2024 +0530 CAMEL-21550: start the extender before visibility (#16592) --- .../camel/component/aws2/sqs/Sqs2Consumer.java | 36 +++++++--------------- .../SqsConsumerExtendMessageVisibilityTest.java | 2 +- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 1ff99a1a63f..019f1dd51e9 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -48,17 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageNotInflightException; -import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException; -import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; -import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import software.amazon.awssdk.services.sqs.model.SqsException; +import software.amazon.awssdk.services.sqs.model.*; /** * A Consumer of messages from the Amazon Web Service Simple Queue Service <a href="http://aws.amazon.com/sqs/">AWS @@ -348,17 +338,18 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); if (visibilityTimeout != null && visibilityTimeout > 0) { - int delay = visibilityTimeout; + int initialDelay = visibilityTimeout / 2; + int period = visibilityTimeout; int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5); this.timeoutExtender = new TimeoutExtender(repeatSeconds); if (LOG.isDebugEnabled()) { LOG.debug( "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)", - delay, delay, repeatSeconds); + initialDelay, period, repeatSeconds); } this.scheduledFuture - = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS); + = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, initialDelay, period, TimeUnit.SECONDS); } } @@ -448,18 +439,13 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { try { LOG.trace("Extending visibility window by {} seconds for request entries {}", repeatSeconds, batchEntries); - getEndpoint().getClient().changeMessageVisibilityBatch(request); - LOG.debug("Extended visibility window for request entries {}", batchEntries); - } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) { - // Ignore. + ChangeMessageVisibilityBatchResponse br + = getEndpoint().getClient().changeMessageVisibilityBatch(request); + LOG.debug("Extended visibility window for request entries successful {}", br.successful()); + LOG.debug("Extended visibility window for request entries failed {}", br.failed()); } catch (SqsException e) { - if (e.getMessage() - .contains("Message does not exist or is not available for visibility timeout change")) { - // Ignore. - } else { - logException(e, batchEntries); - } - } catch (Exception e) { + logException(e, batchEntries); + } catch (SdkException e) { logException(e, batchEntries); } } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java index 1ecb28ae27d..6eb41f34762 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java @@ -46,7 +46,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport { @Override public void process(Exchange exchange) throws Exception { // Simulate message that takes a while to receive. - Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT. + Thread.sleep(TIMEOUT * 3000L); // 150% of TIMEOUT. } });