This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f6a37d7  CAMEL-17121: converted camel-jms reply manager to repeatable 
tasks
f6a37d7 is described below

commit f6a37d71228e8f3d8f099899a17927236cdff719
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Tue Nov 9 15:45:40 2021 +0100

    CAMEL-17121: converted camel-jms reply manager to repeatable tasks
---
 .../component/jms/reply/ReplyManagerSupport.java   | 40 +++++++++-------------
 1 file changed, 16 insertions(+), 24 deletions(-)

diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 6204398..40d73d0 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms.reply;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,6 +37,9 @@ import org.apache.camel.component.jms.JmsMessageHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.ForegroundTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -230,33 +234,21 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
             log.warn("Early reply received with correlationID [{}] -> {}", 
correlationID, message);
         }
 
-        ReplyHandler answer = null;
-
         // wait up until configured values
-        boolean done = false;
-        int counter = 0;
-        while (!done && counter++ < 
endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter()) 
{
-            log.trace("Early reply not found handler at attempt {}. Waiting a 
bit longer.", counter);
-            try {
-                
Thread.sleep(endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime());
-            } catch (InterruptedException e) {
-                // ignore
-            }
-
-            // try again
-            answer = correlation.get(correlationID);
-            done = answer != null;
+        long interval = 
endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime();
+        ForegroundTask task = 
Tasks.foregroundTask().withBudget(Budgets.iterationBudget()
+                
.withMaxIterations(endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter())
+                .withInterval(Duration.ofMillis(interval))
+                .build())
+                .build();
+
+        return task.run(() -> getReplyHandler(correlationID), answer -> answer 
!= null).orElse(null);
+    }
 
-            if (answer != null) {
-                if (log.isTraceEnabled()) {
-                    log.trace(
-                            "Early reply with correlationID [{}] has been 
matched after {} attempts and can be processed using handler: {}",
-                            correlationID, counter, answer);
-                }
-            }
-        }
+    private ReplyHandler getReplyHandler(String correlationID) {
+        log.trace("Early reply not found handler. Waiting a bit longer.");
 
-        return answer;
+        return correlation.get(correlationID);
     }
 
     @Override

Reply via email to