This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new 436a019  Backport CAMEL-17472 (#6738)
436a019 is described below

commit 436a019d4053eae131c79c1886ef67d86280d1f4
Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com>
AuthorDate: Thu Jan 13 17:26:05 2022 +0100

    Backport CAMEL-17472 (#6738)
    
    * CAMEL-17472: fix consumer reconnect no longer works
    
    Includes:
    - do comply with unlimited duration tasks
    - improved log messages for easier debug
    
    * camel-smpp: updated details about running the manual integration tests
    
    * CAMEL-17477: respect the re-connect delay when reconnecting
    
    * CAMEL-17472: do not exhaust scheduled service
    
    Includes:
    - allow giving more time to the shutdown of background tasks
    - fix preventing other tasks from being scheduled
    - minor related code cleanup
---
 .../apache/camel/component/smpp/SmppConsumer.java  | 99 +++++++++++++---------
 .../apache/camel/component/smpp/SmppProducer.java  | 77 ++++++++++-------
 .../org/apache/camel/component/smpp/SmppUtils.java | 35 ++++++--
 .../integration/SmppConsumerReconnectManualIT.java |  6 +-
 .../integration/SmppProducerReconnectManualIT.java |  6 +-
 .../apache/camel/support/task/BackgroundTask.java  | 23 +++--
 .../support/task/budget/TimeBoundedBudget.java     |  7 +-
 .../task/BackgroundIterationTimeTaskTest.java      | 18 ++++
 8 files changed, 180 insertions(+), 91 deletions(-)

diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 2368bba..8723c1e 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Processor;
@@ -32,15 +33,16 @@ import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.MessageReceiverListener;
 import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
 import org.jsmpp.session.SessionStateListener;
 import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
 import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
 import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
 import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static 
org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
 
 /**
  * An implementation of consumer which use the SMPP protocol
@@ -48,13 +50,16 @@ import static 
org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
 public class SmppConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SmppConsumer.class);
+    private static final String RECONNECT_TASK_NAME = 
"smpp-consumer-reconnect";
 
-    private SmppConfiguration configuration;
-    private SMPPSession session;
-    private MessageReceiverListener messageReceiverListener;
-    private SessionStateListener internalSessionStateListener;
+    private final SmppConfiguration configuration;
+    private final MessageReceiverListener messageReceiverListener;
+    private final SessionStateListener internalSessionStateListener;
 
     private final ReentrantLock reconnectLock = new ReentrantLock();
+    private final ScheduledExecutorService reconnectService;
+
+    private SMPPSession session;
 
     /**
      * The constructor which gets a smpp endpoint, a smpp configuration and a 
processor
@@ -62,19 +67,19 @@ public class SmppConsumer extends DefaultConsumer {
     public SmppConsumer(SmppEndpoint endpoint, SmppConfiguration config, 
Processor processor) {
         super(endpoint, processor);
 
+        this.reconnectService = createExecutor(this, endpoint, 
RECONNECT_TASK_NAME);
+
         this.configuration = config;
-        this.internalSessionStateListener = new SessionStateListener() {
-            @Override
-            public void onStateChange(SessionState newState, SessionState 
oldState, Session source) {
-                if (configuration.getSessionStateListener() != null) {
-                    
configuration.getSessionStateListener().onStateChange(newState, oldState, 
source);
-                }
-
-                if (newState.equals(SessionState.CLOSED)) {
-                    LOG.warn("Lost connection to: {} - trying to 
reconnect...", getEndpoint().getConnectionString());
-                    closeSession();
-                    reconnect(configuration.getInitialReconnectDelay());
-                }
+        this.internalSessionStateListener = (newState, oldState, source) -> {
+            if (configuration.getSessionStateListener() != null) {
+                
configuration.getSessionStateListener().onStateChange(newState, oldState, 
source);
+            }
+
+            if (newState.equals(SessionState.CLOSED)) {
+                LOG.warn("Lost connection to: {} - trying to reconnect...", 
getEndpoint().getConnectionString());
+                closeSession();
+
+                reconnect(configuration.getInitialReconnectDelay());
             }
         };
         this.messageReceiverListener
@@ -92,21 +97,21 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private SMPPSession createSession() throws IOException {
-        SMPPSession session = createSMPPSession();
-        session.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
-        session.setTransactionTimer(configuration.getTransactionTimer());
-        
session.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
-        
session.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
-        session.addSessionStateListener(internalSessionStateListener);
-        session.setMessageReceiverListener(messageReceiverListener);
-        session.connectAndBind(this.configuration.getHost(), 
this.configuration.getPort(),
+        SMPPSession newSession = createSMPPSession();
+        newSession.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
+        newSession.setTransactionTimer(configuration.getTransactionTimer());
+        
newSession.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
+        
newSession.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
+        newSession.addSessionStateListener(internalSessionStateListener);
+        newSession.setMessageReceiverListener(messageReceiverListener);
+        newSession.connectAndBind(this.configuration.getHost(), 
this.configuration.getPort(),
                 new BindParameter(
                         BindType.BIND_RX, this.configuration.getSystemId(),
                         this.configuration.getPassword(), 
this.configuration.getSystemType(),
                         TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN,
                         configuration.getAddressRange()));
 
-        return session;
+        return newSession;
     }
 
     /**
@@ -125,6 +130,8 @@ public class SmppConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
+        shutdownReconnectService(reconnectService);
+
         LOG.debug("Disconnecting from: {}...", 
getEndpoint().getConnectionString());
 
         super.doStop();
@@ -143,29 +150,43 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private boolean doReconnect() {
-        if (isServiceStopping(this)) {
-            return true;
-        }
-
-        if (isSessionClosed(session)) {
-            try {
-                LOG.info("Trying to reconnect to {}", 
getEndpoint().getConnectionString());
-                session = createSession();
+        try {
+            LOG.info("Trying to reconnect to {}", 
getEndpoint().getConnectionString());
+            if (isServiceStopping(this)) {
                 return true;
-            } catch (IOException e) {
-                LOG.warn("Failed to reconnect to {}", 
getEndpoint().getConnectionString());
-                closeSession();
+            }
 
-                return false;
+            if (isSessionClosed(session)) {
+                return tryCreateSession();
             }
+
+            LOG.info("Nothing to do: the session is not closed");
+        } catch (Exception e) {
+            LOG.error("Unable to reconnect to {}: {}", 
getEndpoint().getConnectionString(), e.getMessage(), e);
+            return false;
         }
 
         return true;
     }
 
+    private boolean tryCreateSession() {
+        try {
+            LOG.info("Creating a new session to {}", 
getEndpoint().getConnectionString());
+            session = createSession();
+            LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to reconnect to {}", 
getEndpoint().getConnectionString());
+            closeSession();
+
+            return false;
+        }
+    }
+
     private void reconnect(final long initialReconnectDelay) {
         if (reconnectLock.tryLock()) {
-            BlockingTask task = newReconnectTask(this, getEndpoint(), 
initialReconnectDelay,
+            BlockingTask task = newReconnectTask(reconnectService, 
RECONNECT_TASK_NAME, initialReconnectDelay,
+                    configuration.getReconnectDelay(),
                     configuration.getMaxReconnect());
 
             try {
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index a7bc510..c56d78e 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
@@ -32,15 +33,16 @@ import org.jsmpp.bean.TypeOfNumber;
 import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
 import org.jsmpp.session.SessionStateListener;
 import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
 import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
 import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
 import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static 
org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
 
 /**
  * An implementation of @{link Producer} which use the SMPP protocol
@@ -48,27 +50,30 @@ import static 
org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
 public class SmppProducer extends DefaultProducer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SmppProducer.class);
+    private static final String RECONNECT_TASK_NAME = 
"smpp-producer-reconnect";
 
-    private SmppConfiguration configuration;
-    private SMPPSession session;
-    private SessionStateListener internalSessionStateListener;
+    private final SmppConfiguration configuration;
+    private final SessionStateListener internalSessionStateListener;
     private final ReentrantLock connectLock = new ReentrantLock();
+    private final ScheduledExecutorService reconnectService;
+
+    private SMPPSession session;
 
     public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
         super(endpoint);
+
+        this.reconnectService = createExecutor(this, endpoint, 
RECONNECT_TASK_NAME);
+
         this.configuration = config;
-        this.internalSessionStateListener = new SessionStateListener() {
-            @Override
-            public void onStateChange(SessionState newState, SessionState 
oldState, Session source) {
-                if (configuration.getSessionStateListener() != null) {
-                    
configuration.getSessionStateListener().onStateChange(newState, oldState, 
source);
-                }
+        this.internalSessionStateListener = (newState, oldState, source) -> {
+            if (configuration.getSessionStateListener() != null) {
+                
configuration.getSessionStateListener().onStateChange(newState, oldState, 
source);
+            }
 
-                if (newState.equals(SessionState.CLOSED)) {
-                    LOG.warn("Lost connection to: {} - trying to 
reconnect...", getEndpoint().getConnectionString());
-                    closeSession();
-                    reconnect(configuration.getInitialReconnectDelay());
-                }
+            if (newState.equals(SessionState.CLOSED)) {
+                LOG.warn("Lost connection to: {} - trying to reconnect...", 
getEndpoint().getConnectionString());
+                closeSession();
+                reconnect(configuration.getInitialReconnectDelay());
             }
         };
     }
@@ -164,6 +169,8 @@ public class SmppProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
+        shutdownReconnectService(reconnectService);
+
         LOG.debug("Disconnecting from: {}...", 
getEndpoint().getConnectionString());
 
         super.doStop();
@@ -183,8 +190,8 @@ public class SmppProducer extends DefaultProducer {
 
     private void reconnect(final long initialReconnectDelay) {
         if (connectLock.tryLock()) {
-            BlockingTask task = newReconnectTask(this, getEndpoint(), 
initialReconnectDelay,
-                    configuration.getMaxReconnect());
+            BlockingTask task = newReconnectTask(reconnectService, 
RECONNECT_TASK_NAME, initialReconnectDelay,
+                    configuration.getReconnectDelay(), 
configuration.getMaxReconnect());
 
             try {
                 task.run(this::doReconnect);
@@ -195,26 +202,38 @@ public class SmppProducer extends DefaultProducer {
     }
 
     private boolean doReconnect() {
-        if (isServiceStopping(this)) {
-            return true;
-        }
-
-        if (isSessionClosed(session)) {
-            try {
-                LOG.info("Trying to reconnect to {}", 
getEndpoint().getConnectionString());
-                session = createSession();
+        try {
+            LOG.info("Trying to reconnect to {}", 
getEndpoint().getConnectionString());
+            if (isServiceStopping(this)) {
                 return true;
-            } catch (IOException e) {
-                LOG.warn("Failed to reconnect to {}", 
getEndpoint().getConnectionString());
-                closeSession();
+            }
 
-                return false;
+            if (isSessionClosed(session)) {
+                return tryCreateSession();
             }
+
+            LOG.info("Nothing to do: the session is not closed");
+        } catch (Exception e) {
+            LOG.error("Unable to reconnect to {}: {}", 
getEndpoint().getConnectionString(), e.getMessage(), e);
+            return false;
         }
 
         return true;
     }
 
+    private boolean tryCreateSession() {
+        try {
+
+            session = createSession();
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to reconnect to {}", 
getEndpoint().getConnectionString());
+            closeSession();
+
+            return false;
+        }
+    }
+
     @Override
     public SmppEndpoint getEndpoint() {
         return (SmppEndpoint) super.getEndpoint();
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 2d2eaf0..44867e0 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -19,9 +19,12 @@ package org.apache.camel.component.smpp;
 import java.time.Duration;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.support.service.BaseService;
 import org.apache.camel.support.task.BlockingTask;
 import org.apache.camel.support.task.Tasks;
@@ -34,9 +37,10 @@ import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.util.AbsoluteTimeFormatter;
 import org.jsmpp.util.TimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class SmppUtils {
-
     /**
      * See http://unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT
      */
@@ -67,6 +71,7 @@ public final class SmppUtils {
             { 60, 91 }, { 61, 126 }, { 62, 93 }, { 64, 124 }, { 101, 164 }
     };
 
+    private static final Logger LOG = LoggerFactory.getLogger(SmppUtils.class);
     private static final TimeFormatter TIME_FORMATTER = new 
AbsoluteTimeFormatter();
 
     private SmppUtils() {
@@ -281,21 +286,37 @@ public final class SmppUtils {
         return session == null || 
session.getSessionState().equals(SessionState.CLOSED);
     }
 
-    public static BlockingTask newReconnectTask(
-            BaseService source, Endpoint endpoint, long initialReconnectDelay,
-            int maxReconnect) {
-        final String taskName = "smpp-reconnect";
-        ScheduledExecutorService service = 
endpoint.getCamelContext().getExecutorServiceManager()
-                .newSingleThreadScheduledExecutor(source, taskName);
+    public static ScheduledExecutorService createExecutor(BaseService service, 
Endpoint endpoint, String taskName) {
+        if (endpoint.getCamelContext() != null && 
endpoint.getCamelContext().getExecutorServiceManager() != null) {
+            ExecutorServiceManager manager = 
endpoint.getCamelContext().getExecutorServiceManager();
+            return manager.newSingleThreadScheduledExecutor(service, taskName);
+        } else {
+            LOG.warn("Not using the Camel scheduled thread executor");
+            return Executors.newSingleThreadScheduledExecutor();
+        }
+    }
 
+    public static BlockingTask newReconnectTask(
+            ScheduledExecutorService service, String taskName, long 
initialReconnectDelay,
+            long reconnectDelay, int maxReconnect) {
         return Tasks.backgroundTask()
                 .withBudget(Budgets.iterationTimeBudget()
                         
.withInitialDelay(Duration.ofMillis(initialReconnectDelay))
                         .withMaxIterations(maxReconnect)
                         .withUnlimitedDuration()
+                        .withInterval(Duration.ofMillis(reconnectDelay))
                         .build())
                 .withScheduledExecutor(service)
                 .withName(taskName)
                 .build();
     }
+
+    public static void shutdownReconnectService(ScheduledExecutorService 
service) throws InterruptedException {
+        service.shutdown();
+        if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("The reconnect service did not finish executing within 
the timeout");
+
+            service.shutdownNow();
+        }
+    }
 }
diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
index 396aa65..4063ed3 100644
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
+++ 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
 /**
  * Spring based integration test for the smpp component. To run this test, 
ensure that the SMSC is running on: host:
  * localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: 
http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: 
http://www.seleniumsoftware.com/downloads.html.
+ * 
+ * Since it is not available anymore, it's possible to test the reconnect 
logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
  */
 @Disabled("Must be manually tested")
 public class SmppConsumerReconnectManualIT extends CamelTestSupport {
diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
index 824620e..3f74b2c 100644
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
+++ 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
 /**
  * Spring based integration test for the smpp component. To run this test, 
ensure that the SMSC is running on: host:
  * localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: 
http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: 
http://www.seleniumsoftware.com/downloads.html.
+ *
+ * Since it is not available anymore, it's possible to test the reconnect 
logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
  */
 @Disabled("Must be manually tested")
 public class SmppProducerReconnectManualIT extends CamelTestSupport {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 3f4ee1e..01fe497 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -20,6 +20,7 @@ package org.apache.camel.support.task;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
@@ -127,29 +128,27 @@ public class BackgroundTask implements BlockingTask {
     public <T> boolean run(Predicate<T> predicate, T payload) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        // We need it to be cancellable/non-runnable after reaching a certain 
point, and it needs to be deterministic.
-        // This is why we ignore the ScheduledFuture returned and implement 
the go/no-go using a latch.
-        service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, 
payload),
+        Future<?> task = service.scheduleAtFixedRate(() -> 
runTaskWrapper(latch, predicate, payload),
                 budget.initialDelay(), budget.interval(), 
TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, service);
+        return waitForTaskCompletion(latch, task);
     }
 
     @Override
     public boolean run(BooleanSupplier supplier) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        // We need it to be cancellable/non-runnable after reaching a certain 
point, and it needs to be deterministic.
-        // This is why we ignore the ScheduledFuture returned and implement 
the go/no-go using a latch.
-        service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), 
budget.initialDelay(),
+        Future<?> task = service.scheduleAtFixedRate(() -> 
runTaskWrapper(latch, supplier), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, service);
+        return waitForTaskCompletion(latch, task);
     }
 
-    private boolean waitForTaskCompletion(CountDownLatch latch, 
ScheduledExecutorService service) {
+    private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> 
task) {
         boolean completed = false;
         try {
+            // We need it to be cancellable/non-runnable after reaching a 
certain point, and it needs to be deterministic.
+            // This is why we ignore the ScheduledFuture returned and 
implement the go/no-go using a latch.
             if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
                 latch.await();
                 completed = true;
@@ -163,14 +162,12 @@ public class BackgroundTask implements BlockingTask {
                 }
             }
 
-            service.shutdown();
-            service.awaitTermination(1, TimeUnit.SECONDS);
+            task.cancel(true);
         } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for the repeatable task to 
execute");
+            LOG.warn("Interrupted while waiting for the repeatable task to 
execute: {}", e.getMessage(), e);
             Thread.currentThread().interrupt();
         } finally {
             elapsed = budget.elapsed();
-            service.shutdownNow();
         }
 
         return completed;
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
index 1c57329..ed48374 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
@@ -55,7 +55,12 @@ public class TimeBoundedBudget implements TimeBudget {
 
     @Override
     public boolean canContinue() {
-        // ... if time budget is NOT exhausted
+        // ... unless running forever
+        if (maxDuration == UNLIMITED_DURATION) {
+            return true;
+        }
+
+        // ... or if time budget is NOT exhausted
         if (elapsed().toMillis() >= maxDuration) {
             return false;
         }
diff --git 
a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
 
b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index b4fb5d2..cd5f7fb 100644
--- 
a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ 
b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -145,4 +145,22 @@ public class BackgroundIterationTimeTaskTest extends 
TaskTestSupport {
         assertTrue(taskCount < maxIterations);
         assertFalse(completed, "The task did not complete because of timeout, 
the return should be false");
     }
+
+    @DisplayName("Test that the task runs until the boolean supplier succeeds")
+    @Test
+    @Timeout(10)
+    void testRunNoMoreBooleanSupplierWithForever() {
+        BackgroundTask task = Tasks.backgroundTask()
+                
.withScheduledExecutor(Executors.newSingleThreadScheduledExecutor())
+                .withBudget(Budgets.iterationTimeBudget()
+                        .withMaxIterations(Integer.MAX_VALUE)
+                        .withInitialDelay(Duration.ofSeconds(1))
+                        .withUnlimitedDuration()
+                        .build())
+                .build();
+
+        boolean completed = task.run(this::taskPredicateWithDeterministicStop, 
4);
+        assertTrue(maxIterations > taskCount, "The task execution should not 
exceed the max iterations");
+        assertTrue(completed, "The task did not complete, the return should be 
false");
+    }
 }

Reply via email to