This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 0f4515f8 CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks 0f4515f8 is described below commit 0f4515f896f0a2425077b2ec9f4aca852277b6d9 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Nov 9 18:11:03 2021 +0100 CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks --- .../rabbitmq/reply/ReplyManagerSupport.java | 36 ++++++---------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java index 6b39178..e8f7972 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.rabbitmq.reply; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +34,9 @@ import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.task.ForegroundTask; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,33 +219,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl LOG.warn("Early reply received with correlationID [{}] -> {}", correlationID, message); } - ReplyHandler answer = null; + ForegroundTask task = Tasks.foregroundTask().withBudget(Budgets.iterationBudget() + .withMaxIterations(50) + .withInterval(Duration.ofMillis(100)) + .build()) + .build(); - // wait up till 5 seconds - boolean done = false; - int counter = 0; - while (!done && counter++ < 50) { - LOG.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // ignore - } - - // try again - answer = correlation.get(correlationID); - done = answer != null; - - if (answer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}", - correlationID, counter, answer); - } - } - } - - return answer; + return task.run(() -> correlation.get(correlationID), answer -> answer != null).orElse(null); } @Override