This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new f6a37d7 CAMEL-17121: converted camel-jms reply manager to repeatable tasks f6a37d7 is described below commit f6a37d71228e8f3d8f099899a17927236cdff719 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Nov 9 15:45:40 2021 +0100 CAMEL-17121: converted camel-jms reply manager to repeatable tasks --- .../component/jms/reply/ReplyManagerSupport.java | 40 +++++++++------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 6204398..40d73d0 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.jms.reply; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -36,6 +37,9 @@ import org.apache.camel.component.jms.JmsMessageHelper; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.task.ForegroundTask; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,33 +234,21 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message); } - ReplyHandler answer = null; - // wait up until configured values - boolean done = false; - int counter = 0; - while (!done && counter++ < endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter()) { - log.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter); - try { - Thread.sleep(endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime()); - } catch (InterruptedException e) { - // ignore - } - - // try again - answer = correlation.get(correlationID); - done = answer != null; + long interval = endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(); + ForegroundTask task = Tasks.foregroundTask().withBudget(Budgets.iterationBudget() + .withMaxIterations(endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter()) + .withInterval(Duration.ofMillis(interval)) + .build()) + .build(); + + return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null); + } - if (answer != null) { - if (log.isTraceEnabled()) { - log.trace( - "Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}", - correlationID, counter, answer); - } - } - } + private ReplyHandler getReplyHandler(String correlationID) { + log.trace("Early reply not found handler. Waiting a bit longer."); - return answer; + return correlation.get(correlationID); } @Override