This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 07c502761c4 CAMEL-20881: Only create DefaultPollingConsumerPollStrategy when needed. (#14558) 07c502761c4 is described below commit 07c502761c4d450f5b2b7bba19ce4f69f507212d Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jun 17 14:11:47 2024 +0200 CAMEL-20881: Only create DefaultPollingConsumerPollStrategy when needed. (#14558) --- .../RemoteFilePollingConsumerPollStrategy.java | 12 ++++++++---- .../engine/LimitedPollingConsumerPollStrategy.java | 21 ++++++++++----------- .../support/DefaultPollingConsumerPollStrategy.java | 7 ++----- .../apache/camel/support/ScheduledPollConsumer.java | 7 ++++--- .../apache/camel/support/ScheduledPollEndpoint.java | 2 +- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java index ba18b2b886b..222b806637b 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java @@ -19,12 +19,16 @@ package org.apache.camel.component.file.remote; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.support.DefaultPollingConsumerPollStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Remote file consumer polling strategy that attempts to help recovering from lost connections. */ public class RemoteFilePollingConsumerPollStrategy extends DefaultPollingConsumerPollStrategy { + private static final Logger LOG = LoggerFactory.getLogger(RemoteFilePollingConsumerPollStrategy.class); + @Override public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { if (consumer instanceof RemoteFileConsumer) { @@ -34,16 +38,16 @@ public class RemoteFilePollingConsumerPollStrategy extends DefaultPollingConsume if (rfc.isRunAllowed()) { // disconnect from the server to force it to re login at next // poll to recover - if (log.isWarnEnabled()) { - log.warn("Trying to recover by force disconnecting from remote server and re-connecting at next poll: {}", + if (LOG.isWarnEnabled()) { + LOG.warn("Trying to recover by force disconnecting from remote server and re-connecting at next poll: {}", rfc.remoteServer()); } try { rfc.forceDisconnect(); } catch (Exception t) { // ignore the exception - if (log.isDebugEnabled()) { - log.debug("Error occurred during force disconnecting from: {}. This exception will be ignored.", + if (LOG.isDebugEnabled()) { + LOG.debug("Error occurred during force disconnecting from: {}. This exception will be ignored.", rfc.remoteServer(), t); } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java index 89c7edc3bca..25189de2786 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java @@ -21,9 +21,10 @@ import java.util.Map; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; -import org.apache.camel.Service; import org.apache.camel.support.DefaultPollingConsumerPollStrategy; import org.apache.camel.support.service.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link org.apache.camel.spi.PollingConsumerPollStrategy} which supports suspending consumers if they failed for X @@ -33,7 +34,9 @@ import org.apache.camel.support.service.ServiceHelper; * will be suspended/stopped. This prevents the log to get flooded with failed attempts, for example during nightly * runs. */ -public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPollStrategy implements Service { +public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPollStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedPollingConsumerPollStrategy.class); private final Map<Consumer, Integer> state = new HashMap<>(); private int limit = 3; @@ -69,7 +72,7 @@ public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPo } else { times += 1; } - log.debug("Rollback occurred after {} times when consuming {}", times, endpoint); + LOG.debug("Rollback occurred after {} times when consuming {}", times, endpoint); boolean retry = false; @@ -94,7 +97,7 @@ public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPo * @throws Exception is thrown if error suspending the consumer */ protected void onSuspend(Consumer consumer, Endpoint endpoint) throws Exception { - log.warn("Suspending consumer {} after {} attempts to consume from {}. You have to manually resume the consumer!", + LOG.warn("Suspending consumer {} after {} attempts to consume from {}. You have to manually resume the consumer!", consumer, limit, endpoint); ServiceHelper.suspendService(consumer); } @@ -104,7 +107,7 @@ public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPo * * @param consumer the consumer * @param endpoint the endpoint - * @return whether or not to retry immediately, is default <tt>false</tt> + * @return whether to retry immediately, is default <tt>false</tt> * @throws Exception can be thrown in case something goes wrong */ protected boolean onRollback(Consumer consumer, Endpoint endpoint) throws Exception { @@ -113,12 +116,8 @@ public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPo } @Override - public void start() { - // noop - } - - @Override - public void stop() { + protected void doStop() throws Exception { state.clear(); } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java index 6cf252f4e29..d62ecf9a235 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java @@ -19,15 +19,12 @@ package org.apache.camel.support; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.spi.PollingConsumerPollStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.camel.support.service.ServiceSupport; /** * A default implementation that will not retry on rollback. */ -public class DefaultPollingConsumerPollStrategy implements PollingConsumerPollStrategy { - - protected final Logger log = LoggerFactory.getLogger(getClass()); +public class DefaultPollingConsumerPollStrategy extends ServiceSupport implements PollingConsumerPollStrategy { @Override public boolean begin(Consumer consumer, Endpoint endpoint) { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java index 6d02602c9c6..97d38d0f5b9 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java @@ -61,7 +61,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer private long delay = 500; private TimeUnit timeUnit = TimeUnit.MILLISECONDS; private boolean useFixedDelay = true; - private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); + private PollingConsumerPollStrategy pollStrategy; private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; private boolean sendEmptyMessageWhenIdle; private boolean greedy; @@ -615,8 +615,9 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer LOG.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()); } - - ObjectHelper.notNull(pollStrategy, "pollStrategy", this); + if (pollStrategy == null) { + pollStrategy = new DefaultPollingConsumerPollStrategy(); + } } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java index ea42b8c93f5..2f3976d3db0 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java @@ -61,7 +61,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { @UriParam(label = "consumer,advanced", description = "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation" + " to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.") - private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); + private PollingConsumerPollStrategy pollStrategy; @UriParam(defaultValue = "TRACE", label = "consumer,scheduler", description = "The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.") private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;