orpiske commented on code in PR #14926:
URL: https://github.com/apache/camel/pull/14926#discussion_r1694777569


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -473,4 +438,370 @@ private void logException(Exception e, 
List<ChangeMessageVisibilityBatchRequestE
         }
     }
 
+    /**
+     * Task responsible for polling the messages from Amazon SQS server.
+     *
+     * Depending on the configuration, the polling may involve sending one or 
more receive requests in a single task
+     * call. The number of send requests depends on the {@link 
Sqs2Endpoint#getMaxMessagesPerPoll()} configuration. The
+     * Amazon SQS receive API has upper limit of maximum 10 messages that can 
be fetched with a single request. To
+     * enable handling greater number of messages fetched per poll, multiple 
requests are being send asynchronously and
+     * then joined together.
+     *
+     * To preserver the ordering, an optional {@link 
Sqs2Configuration#getSortAttributeName()} can be configured. When
+     * specified, all messages collected from the concurrent requests are 
being sorted using this attribute.
+     *
+     * In addition to that, the task is also responsible for handling 
auto-creation of the SQS queue, when its missing.
+     * The queue is created when receive request returns an error about the 
missing queue and the
+     * {@link Sqs2Configuration#isAutoCreateQueue()} is enabled. In such case, 
the queue will be created and the task
+     * will return empty list of messages.
+     *
+     * If the queue creation fails with an error related to recently deleted 
queue, the queue creation will be postponed
+     * for at least 30 seconds. To prevent task from blocking the consumer 
thread, the 30 second timeout is being
+     * checked in each task call. If the scheduled time for queue 
auto-creation was not reached yet, the task will
+     * simply return empty list of messages. Once the scheduled time is 
reached, another queue creation attempt will be
+     * made.
+     */
+    private static class PollingTask implements 
Callable<List<software.amazon.awssdk.services.sqs.model.Message>>, Closeable {
+        /**
+         * The maximum number of messages that can be requested in a single 
request to AWS SQS.
+         */
+        private static final int MAX_NUMBER_OF_MESSAGES_PER_REQUEST = 10;
+
+        /**
+         * The time to wait before re-creating recently deleted queue.
+         */
+        private static final long RECENTLY_DELETED_QUEUE_BACKOFF_TIME_MS = 
30_000L;
+
+        private static final Pattern COMMA_SEPARATED_PATTERN = 
Pattern.compile(",");
+
+        /**
+         * A scheduled time for queue auto-creation, measured with {@link 
Clock#elapsed()} value. The value of
+         *
+         * <pre>
+         * 0
+         * </pre>
+         *
+         * means there is no schedule.
+         */
+        private final AtomicLong queueAutoCreationScheduleTime = new 
AtomicLong(0L);
+        private final Object mutex = new Object();
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        private final Clock clock;
+        private final SqsClient sqsClient;
+        private final ExecutorService requestExecutor;
+        private final ExecutorServiceManager executorServiceManager;
+        private final IOConsumer<SqsClient> createQueueOperation;
+
+        private final String queueName;
+        private final String queueUrl;
+        private final int maxMessagesPerPoll;
+        private final Integer visibilityTimeout;
+        private final Integer waitTimeSeconds;
+        private final Collection<MessageSystemAttributeName> attributeNames;
+        private final Collection<String> messageAttributeNames;
+        private final int numberOfRequestsPerPoll;
+        private final boolean queueAutoCreationEnabled;
+        private final MessageSystemAttributeName sortAttributeName;
+
+        @SuppressWarnings("resource")
+        private PollingTask(Sqs2Endpoint endpoint) {
+            clock = endpoint.getCamelContext().getClock();

Review Comment:
   I think it would be better to avoid the CamelContext clock here. Instead, 
create a new one: 
   `clock = new MonotonicClock()`. 
   
   There are 2 reasons for that:
   
   1. The `endpoint.getCamelContext().getClock()` gets the CamelContext's 
clock, which is used to track events for the camel context. 
   2. It's slower, because the Context's clock has a more complex 
implementation (i.e.: need to be able to track multiple events, etc). 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to