This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 07b9c68ea2a9a70cf3a3f42c1045fb9fd0acbe6a Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Sat Feb 11 22:25:01 2023 +0100 CAMEL-19058: check StatefulServiceInitialization just once This avoids both the type checking and the initialization status on the hot path --- .../camel/support/cache/DefaultProducerCache.java | 35 -------------------- .../apache/camel/support/cache/ServicePool.java | 38 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java index d85fe95105c..17612316e62 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java @@ -16,7 +16,6 @@ */ package org.apache.camel.support.cache; -import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; @@ -31,7 +30,6 @@ import org.apache.camel.ExchangePropertyKey; import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.StatefulService; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.SharedInternalProcessor; @@ -40,10 +38,6 @@ import org.apache.camel.support.DefaultEndpointUtilizationStatistics; import org.apache.camel.support.EventHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.support.task.BlockingTask; -import org.apache.camel.support.task.Tasks; -import org.apache.camel.support.task.budget.Budgets; -import org.apache.camel.support.task.budget.IterationBoundedBudget; import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,9 +46,7 @@ import org.slf4j.LoggerFactory; * Default implementation of {@link ProducerCache}. */ public class DefaultProducerCache extends ServiceSupport implements ProducerCache { - private static final Logger LOG = LoggerFactory.getLogger(DefaultProducerCache.class); - private static final long ACQUIRE_WAIT_TIME = 30000; private final CamelContext camelContext; private final ProducerServicePool producers; @@ -125,24 +117,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach return source; } - private void waitForService(StatefulService service) { - BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget() - .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS) - .withMaxDuration(Duration.ofMillis(ACQUIRE_WAIT_TIME)) - .withInterval(Duration.ofMillis(5)) - .build()) - .build(); - - if (!task.run(service::isStarting)) { - LOG.warn("The producer: {} did not finish starting in {} ms", service, ACQUIRE_WAIT_TIME); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", task.elapsed().toMillis(), service, - service.getStatus()); - } - } - @Override public AsyncProducer acquireProducer(Endpoint endpoint) { try { @@ -150,15 +124,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach if (statistics != null) { statistics.onHit(endpoint.getEndpointUri()); } - - // if producer is starting then wait for it to be ready - if (producer instanceof StatefulService) { - StatefulService ss = (StatefulService) producer; - if (ss.isStarting()) { - LOG.trace("Waiting for producer to finish starting: {}", producer); - waitForService(ss); - } - } return producer; } catch (Throwable e) { throw new FailedToCreateProducerException(endpoint, e); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java index fe0b6fe6cf9..6017d1cbb97 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java @@ -16,6 +16,7 @@ */ package org.apache.camel.support.cache; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -28,9 +29,14 @@ import java.util.function.Function; import org.apache.camel.Endpoint; import org.apache.camel.NonManagedService; import org.apache.camel.Service; +import org.apache.camel.StatefulService; import org.apache.camel.support.LRUCache; import org.apache.camel.support.LRUCacheFactory; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.task.BlockingTask; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; +import org.apache.camel.support.task.budget.IterationBoundedBudget; import org.apache.camel.util.function.ThrowingFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,6 +130,24 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements return s; } + private void waitForService(StatefulService service) { + BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget() + .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS) + .withMaxDuration(Duration.ofMillis(30000)) + .withInterval(Duration.ofMillis(5)) + .build()) + .build(); + + if (!task.run(service::isStarting)) { + LOG.warn("The producer: {} did not finish starting in {} ms", service, 30000); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", task.elapsed().toMillis(), service, + service.getStatus()); + } + } + /** * Releases the producer/consumer back to the pool * @@ -237,6 +261,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements S tempS = creator.apply(endpoint); endpoint.getCamelContext().addService(tempS, true, true); s = tempS; + + if (s instanceof StatefulService ss) { + if (ss.isStarting()) { + LOG.trace("Waiting for producer to finish starting: {}", s); + waitForService(ss); + } + } } } } @@ -349,6 +380,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements if (s == null) { s = creator.apply(endpoint); s.start(); + + if (s instanceof StatefulService ss) { + if (ss.isStarting()) { + LOG.trace("Waiting for producer to finish starting: {}", s); + waitForService(ss); + } + } } } return s;