This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.8.x by this push:
     new 78752e30390 CAMEL-21550: start the extender before visibility (#16592)
78752e30390 is described below

commit 78752e30390679108955a80f84abcc2eafc1bcd9
Author: Narsi Nallamilli <narsi.nallami...@gmail.com>
AuthorDate: Wed Dec 18 14:40:26 2024 +0530

    CAMEL-21550: start the extender before visibility (#16592)
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 36 +++++++---------------
 .../SqsConsumerExtendMessageVisibilityTest.java    |  2 +-
 2 files changed, 12 insertions(+), 26 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 4637725c20a..c170a2c75fa 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -66,17 +66,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
-import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
-import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
-import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
-import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
-import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
-import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
-import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
-import 
software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException;
-import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
-import software.amazon.awssdk.services.sqs.model.SqsException;
+import software.amazon.awssdk.services.sqs.model.*;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.unmodifiableList;
@@ -311,17 +301,18 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
             Integer visibilityTimeout = 
getConfiguration().getVisibilityTimeout();
 
             if (visibilityTimeout != null && visibilityTimeout > 0) {
-                int delay = visibilityTimeout;
+                int initialDelay = visibilityTimeout / 2;
+                int period = visibilityTimeout;
                 int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 
1.5);
                 this.timeoutExtender = new TimeoutExtender(repeatSeconds);
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
                             "Scheduled TimeoutExtender task to start after {} 
delay, and run with {}/{} period/repeat (seconds)",
-                            delay, delay, repeatSeconds);
+                            initialDelay, period, repeatSeconds);
                 }
                 this.scheduledFuture
-                        = 
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, 
TimeUnit.SECONDS);
+                        = 
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, initialDelay, 
period, TimeUnit.SECONDS);
             }
         }
 
@@ -415,18 +406,13 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                     try {
                         LOG.trace("Extending visibility window by {} seconds 
for request entries {}", repeatSeconds,
                                 batchEntries);
-                        
getEndpoint().getClient().changeMessageVisibilityBatch(request);
-                        LOG.debug("Extended visibility window for request 
entries {}", batchEntries);
-                    } catch (MessageNotInflightException | 
ReceiptHandleIsInvalidException e) {
-                        // Ignore.
+                        ChangeMessageVisibilityBatchResponse br
+                                = 
getEndpoint().getClient().changeMessageVisibilityBatch(request);
+                        LOG.debug("Extended visibility window for request 
entries successful {}", br.successful());
+                        LOG.debug("Extended visibility window for request 
entries failed {}", br.failed());
                     } catch (SqsException e) {
-                        if (e.getMessage()
-                                .contains("Message does not exist or is not 
available for visibility timeout change")) {
-                            // Ignore.
-                        } else {
-                            logException(e, batchEntries);
-                        }
-                    } catch (Exception e) {
+                        logException(e, batchEntries);
+                    } catch (SdkException e) {
                         logException(e, batchEntries);
                     }
                 }
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
index 282ed85e76f..1c2d2aea695 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -46,7 +46,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends 
CamelTestSupport {
             @Override
             public void process(Exchange exchange) throws Exception {
                 // Simulate message that takes a while to receive.
-                Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT.
+                Thread.sleep(TIMEOUT * 3000L); // 150% of TIMEOUT.
             }
         });
 

Reply via email to