This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push: new 436a019 Backport CAMEL-17472 (#6738) 436a019 is described below commit 436a019d4053eae131c79c1886ef67d86280d1f4 Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com> AuthorDate: Thu Jan 13 17:26:05 2022 +0100 Backport CAMEL-17472 (#6738) * CAMEL-17472: fix consumer reconnect no longer works Includes: - do comply with unlimited duration tasks - improved log messages for easier debug * camel-smpp: updated details about running the manual integration tests * CAMEL-17477: respect the re-connect delay when reconnecting * CAMEL-17472: do not exhaust scheduled service Includes: - allow giving more time to the shutdown of background tasks - fix preventing other tasks from being scheduled - minor related code cleanup --- .../apache/camel/component/smpp/SmppConsumer.java | 99 +++++++++++++--------- .../apache/camel/component/smpp/SmppProducer.java | 77 ++++++++++------- .../org/apache/camel/component/smpp/SmppUtils.java | 35 ++++++-- .../integration/SmppConsumerReconnectManualIT.java | 6 +- .../integration/SmppProducerReconnectManualIT.java | 6 +- .../apache/camel/support/task/BackgroundTask.java | 23 +++-- .../support/task/budget/TimeBoundedBudget.java | 7 +- .../task/BackgroundIterationTimeTaskTest.java | 18 ++++ 8 files changed, 180 insertions(+), 91 deletions(-) diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java index 2368bba..8723c1e 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.smpp; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Processor; @@ -32,15 +33,16 @@ import org.jsmpp.extra.SessionState; import org.jsmpp.session.BindParameter; import org.jsmpp.session.MessageReceiverListener; import org.jsmpp.session.SMPPSession; -import org.jsmpp.session.Session; import org.jsmpp.session.SessionStateListener; import org.jsmpp.util.DefaultComposer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.smpp.SmppUtils.createExecutor; import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping; import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed; import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask; +import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService; /** * An implementation of consumer which use the SMPP protocol @@ -48,13 +50,16 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask; public class SmppConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(SmppConsumer.class); + private static final String RECONNECT_TASK_NAME = "smpp-consumer-reconnect"; - private SmppConfiguration configuration; - private SMPPSession session; - private MessageReceiverListener messageReceiverListener; - private SessionStateListener internalSessionStateListener; + private final SmppConfiguration configuration; + private final MessageReceiverListener messageReceiverListener; + private final SessionStateListener internalSessionStateListener; private final ReentrantLock reconnectLock = new ReentrantLock(); + private final ScheduledExecutorService reconnectService; + + private SMPPSession session; /** * The constructor which gets a smpp endpoint, a smpp configuration and a processor @@ -62,19 +67,19 @@ public class SmppConsumer extends DefaultConsumer { public SmppConsumer(SmppEndpoint endpoint, SmppConfiguration config, Processor processor) { super(endpoint, processor); + this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME); + this.configuration = config; - this.internalSessionStateListener = new SessionStateListener() { - @Override - public void onStateChange(SessionState newState, SessionState oldState, Session source) { - if (configuration.getSessionStateListener() != null) { - configuration.getSessionStateListener().onStateChange(newState, oldState, source); - } - - if (newState.equals(SessionState.CLOSED)) { - LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString()); - closeSession(); - reconnect(configuration.getInitialReconnectDelay()); - } + this.internalSessionStateListener = (newState, oldState, source) -> { + if (configuration.getSessionStateListener() != null) { + configuration.getSessionStateListener().onStateChange(newState, oldState, source); + } + + if (newState.equals(SessionState.CLOSED)) { + LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString()); + closeSession(); + + reconnect(configuration.getInitialReconnectDelay()); } }; this.messageReceiverListener @@ -92,21 +97,21 @@ public class SmppConsumer extends DefaultConsumer { } private SMPPSession createSession() throws IOException { - SMPPSession session = createSMPPSession(); - session.setEnquireLinkTimer(configuration.getEnquireLinkTimer()); - session.setTransactionTimer(configuration.getTransactionTimer()); - session.setPduProcessorDegree(this.configuration.getPduProcessorDegree()); - session.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity()); - session.addSessionStateListener(internalSessionStateListener); - session.setMessageReceiverListener(messageReceiverListener); - session.connectAndBind(this.configuration.getHost(), this.configuration.getPort(), + SMPPSession newSession = createSMPPSession(); + newSession.setEnquireLinkTimer(configuration.getEnquireLinkTimer()); + newSession.setTransactionTimer(configuration.getTransactionTimer()); + newSession.setPduProcessorDegree(this.configuration.getPduProcessorDegree()); + newSession.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity()); + newSession.addSessionStateListener(internalSessionStateListener); + newSession.setMessageReceiverListener(messageReceiverListener); + newSession.connectAndBind(this.configuration.getHost(), this.configuration.getPort(), new BindParameter( BindType.BIND_RX, this.configuration.getSystemId(), this.configuration.getPassword(), this.configuration.getSystemType(), TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, configuration.getAddressRange())); - return session; + return newSession; } /** @@ -125,6 +130,8 @@ public class SmppConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { + shutdownReconnectService(reconnectService); + LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString()); super.doStop(); @@ -143,29 +150,43 @@ public class SmppConsumer extends DefaultConsumer { } private boolean doReconnect() { - if (isServiceStopping(this)) { - return true; - } - - if (isSessionClosed(session)) { - try { - LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString()); - session = createSession(); + try { + LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString()); + if (isServiceStopping(this)) { return true; - } catch (IOException e) { - LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString()); - closeSession(); + } - return false; + if (isSessionClosed(session)) { + return tryCreateSession(); } + + LOG.info("Nothing to do: the session is not closed"); + } catch (Exception e) { + LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e); + return false; } return true; } + private boolean tryCreateSession() { + try { + LOG.info("Creating a new session to {}", getEndpoint().getConnectionString()); + session = createSession(); + LOG.info("Reconnected to {}", getEndpoint().getConnectionString()); + return true; + } catch (IOException e) { + LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString()); + closeSession(); + + return false; + } + } + private void reconnect(final long initialReconnectDelay) { if (reconnectLock.tryLock()) { - BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay, + BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay, + configuration.getReconnectDelay(), configuration.getMaxReconnect()); try { diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java index a7bc510..c56d78e 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.smpp; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; @@ -32,15 +33,16 @@ import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.extra.SessionState; import org.jsmpp.session.BindParameter; import org.jsmpp.session.SMPPSession; -import org.jsmpp.session.Session; import org.jsmpp.session.SessionStateListener; import org.jsmpp.util.DefaultComposer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.smpp.SmppUtils.createExecutor; import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping; import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed; import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask; +import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService; /** * An implementation of @{link Producer} which use the SMPP protocol @@ -48,27 +50,30 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask; public class SmppProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(SmppProducer.class); + private static final String RECONNECT_TASK_NAME = "smpp-producer-reconnect"; - private SmppConfiguration configuration; - private SMPPSession session; - private SessionStateListener internalSessionStateListener; + private final SmppConfiguration configuration; + private final SessionStateListener internalSessionStateListener; private final ReentrantLock connectLock = new ReentrantLock(); + private final ScheduledExecutorService reconnectService; + + private SMPPSession session; public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) { super(endpoint); + + this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME); + this.configuration = config; - this.internalSessionStateListener = new SessionStateListener() { - @Override - public void onStateChange(SessionState newState, SessionState oldState, Session source) { - if (configuration.getSessionStateListener() != null) { - configuration.getSessionStateListener().onStateChange(newState, oldState, source); - } + this.internalSessionStateListener = (newState, oldState, source) -> { + if (configuration.getSessionStateListener() != null) { + configuration.getSessionStateListener().onStateChange(newState, oldState, source); + } - if (newState.equals(SessionState.CLOSED)) { - LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString()); - closeSession(); - reconnect(configuration.getInitialReconnectDelay()); - } + if (newState.equals(SessionState.CLOSED)) { + LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString()); + closeSession(); + reconnect(configuration.getInitialReconnectDelay()); } }; } @@ -164,6 +169,8 @@ public class SmppProducer extends DefaultProducer { @Override protected void doStop() throws Exception { + shutdownReconnectService(reconnectService); + LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString()); super.doStop(); @@ -183,8 +190,8 @@ public class SmppProducer extends DefaultProducer { private void reconnect(final long initialReconnectDelay) { if (connectLock.tryLock()) { - BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay, - configuration.getMaxReconnect()); + BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay, + configuration.getReconnectDelay(), configuration.getMaxReconnect()); try { task.run(this::doReconnect); @@ -195,26 +202,38 @@ public class SmppProducer extends DefaultProducer { } private boolean doReconnect() { - if (isServiceStopping(this)) { - return true; - } - - if (isSessionClosed(session)) { - try { - LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString()); - session = createSession(); + try { + LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString()); + if (isServiceStopping(this)) { return true; - } catch (IOException e) { - LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString()); - closeSession(); + } - return false; + if (isSessionClosed(session)) { + return tryCreateSession(); } + + LOG.info("Nothing to do: the session is not closed"); + } catch (Exception e) { + LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e); + return false; } return true; } + private boolean tryCreateSession() { + try { + + session = createSession(); + return true; + } catch (IOException e) { + LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString()); + closeSession(); + + return false; + } + } + @Override public SmppEndpoint getEndpoint() { return (SmppEndpoint) super.getEndpoint(); diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java index 2d2eaf0..44867e0 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java @@ -19,9 +19,12 @@ package org.apache.camel.component.smpp; import java.time.Duration; import java.util.Calendar; import java.util.Date; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.camel.Endpoint; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.support.service.BaseService; import org.apache.camel.support.task.BlockingTask; import org.apache.camel.support.task.Tasks; @@ -34,9 +37,10 @@ import org.jsmpp.extra.SessionState; import org.jsmpp.session.SMPPSession; import org.jsmpp.util.AbsoluteTimeFormatter; import org.jsmpp.util.TimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class SmppUtils { - /** * See http://unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT */ @@ -67,6 +71,7 @@ public final class SmppUtils { { 60, 91 }, { 61, 126 }, { 62, 93 }, { 64, 124 }, { 101, 164 } }; + private static final Logger LOG = LoggerFactory.getLogger(SmppUtils.class); private static final TimeFormatter TIME_FORMATTER = new AbsoluteTimeFormatter(); private SmppUtils() { @@ -281,21 +286,37 @@ public final class SmppUtils { return session == null || session.getSessionState().equals(SessionState.CLOSED); } - public static BlockingTask newReconnectTask( - BaseService source, Endpoint endpoint, long initialReconnectDelay, - int maxReconnect) { - final String taskName = "smpp-reconnect"; - ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager() - .newSingleThreadScheduledExecutor(source, taskName); + public static ScheduledExecutorService createExecutor(BaseService service, Endpoint endpoint, String taskName) { + if (endpoint.getCamelContext() != null && endpoint.getCamelContext().getExecutorServiceManager() != null) { + ExecutorServiceManager manager = endpoint.getCamelContext().getExecutorServiceManager(); + return manager.newSingleThreadScheduledExecutor(service, taskName); + } else { + LOG.warn("Not using the Camel scheduled thread executor"); + return Executors.newSingleThreadScheduledExecutor(); + } + } + public static BlockingTask newReconnectTask( + ScheduledExecutorService service, String taskName, long initialReconnectDelay, + long reconnectDelay, int maxReconnect) { return Tasks.backgroundTask() .withBudget(Budgets.iterationTimeBudget() .withInitialDelay(Duration.ofMillis(initialReconnectDelay)) .withMaxIterations(maxReconnect) .withUnlimitedDuration() + .withInterval(Duration.ofMillis(reconnectDelay)) .build()) .withScheduledExecutor(service) .withName(taskName) .build(); } + + public static void shutdownReconnectService(ScheduledExecutorService service) throws InterruptedException { + service.shutdown(); + if (!service.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.warn("The reconnect service did not finish executing within the timeout"); + + service.shutdownNow(); + } + } } diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java index 396aa65..4063ed3 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java @@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test; /** * Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host: * localhost port: 2775 user: smppclient password: password <br/> - * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html + * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html. + * + * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool: + * + * nc -lv 2775 */ @Disabled("Must be manually tested") public class SmppConsumerReconnectManualIT extends CamelTestSupport { diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java index 824620e..3f74b2c 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java @@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test; /** * Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host: * localhost port: 2775 user: smppclient password: password <br/> - * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html + * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html. + * + * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool: + * + * nc -lv 2775 */ @Disabled("Must be manually tested") public class SmppProducerReconnectManualIT extends CamelTestSupport { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java index 3f4ee1e..01fe497 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java @@ -20,6 +20,7 @@ package org.apache.camel.support.task; import java.time.Duration; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; @@ -127,29 +128,27 @@ public class BackgroundTask implements BlockingTask { public <T> boolean run(Predicate<T> predicate, T payload) { CountDownLatch latch = new CountDownLatch(1); - // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic. - // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch. - service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload), + Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload), budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); - return waitForTaskCompletion(latch, service); + return waitForTaskCompletion(latch, task); } @Override public boolean run(BooleanSupplier supplier) { CountDownLatch latch = new CountDownLatch(1); - // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic. - // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch. - service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(), + Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); - return waitForTaskCompletion(latch, service); + return waitForTaskCompletion(latch, task); } - private boolean waitForTaskCompletion(CountDownLatch latch, ScheduledExecutorService service) { + private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) { boolean completed = false; try { + // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic. + // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch. if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) { latch.await(); completed = true; @@ -163,14 +162,12 @@ public class BackgroundTask implements BlockingTask { } } - service.shutdown(); - service.awaitTermination(1, TimeUnit.SECONDS); + task.cancel(true); } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for the repeatable task to execute"); + LOG.warn("Interrupted while waiting for the repeatable task to execute: {}", e.getMessage(), e); Thread.currentThread().interrupt(); } finally { elapsed = budget.elapsed(); - service.shutdownNow(); } return completed; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java index 1c57329..ed48374 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java @@ -55,7 +55,12 @@ public class TimeBoundedBudget implements TimeBudget { @Override public boolean canContinue() { - // ... if time budget is NOT exhausted + // ... unless running forever + if (maxDuration == UNLIMITED_DURATION) { + return true; + } + + // ... or if time budget is NOT exhausted if (elapsed().toMillis() >= maxDuration) { return false; } diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java index b4fb5d2..cd5f7fb 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java +++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java @@ -145,4 +145,22 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport { assertTrue(taskCount < maxIterations); assertFalse(completed, "The task did not complete because of timeout, the return should be false"); } + + @DisplayName("Test that the task runs until the boolean supplier succeeds") + @Test + @Timeout(10) + void testRunNoMoreBooleanSupplierWithForever() { + BackgroundTask task = Tasks.backgroundTask() + .withScheduledExecutor(Executors.newSingleThreadScheduledExecutor()) + .withBudget(Budgets.iterationTimeBudget() + .withMaxIterations(Integer.MAX_VALUE) + .withInitialDelay(Duration.ofSeconds(1)) + .withUnlimitedDuration() + .build()) + .build(); + + boolean completed = task.run(this::taskPredicateWithDeterministicStop, 4); + assertTrue(maxIterations > taskCount, "The task execution should not exceed the max iterations"); + assertTrue(completed, "The task did not complete, the return should be false"); + } }