This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 33ed6c053ed1a1466a0f9e4698da6b54d881341c Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Jan 12 10:57:38 2024 +0100 CAMEL-20297 camel-core-processor: do not swallow interrupted exceptions --- .../java/org/apache/camel/processor/DelayProcessorSupport.java | 7 +++++++ .../main/java/org/apache/camel/processor/MulticastProcessor.java | 2 +- .../src/main/java/org/apache/camel/processor/Resequencer.java | 1 + .../main/java/org/apache/camel/processor/StreamResequencer.java | 2 ++ .../org/apache/camel/processor/aggregate/AggregateProcessor.java | 1 + .../camel/processor/errorhandler/RedeliveryErrorHandler.java | 2 ++ 6 files changed, 14 insertions(+), 1 deletion(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java index 8f8c17b7037..337fc6043d7 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java @@ -102,6 +102,12 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { delay(delay, exchange); // then continue routing return processor.process(exchange, callback); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // exception occurred so we are done + exchange.setException(e); + callback.done(true); + return true; } catch (Exception e) { // exception occurred so we are done exchange.setException(e); @@ -138,6 +144,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { delay(delay, exchange); } catch (InterruptedException ie) { exchange.setException(ie); + Thread.currentThread().interrupt(); } // then continue routing return processor.process(exchange, callback); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 6efdaebff5d..0a2333ba89a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -731,7 +731,7 @@ public class MulticastProcessor extends AsyncProcessorSupport try { Thread.sleep(delay); } catch (InterruptedException e) { - // ignore + Thread.currentThread().interrupt(); } runnable.run(); }); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java index 0375a20e41b..68585f92c50 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java @@ -491,6 +491,7 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); break; } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java index 76730878569..2771df6c9fb 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -243,6 +243,7 @@ public class StreamResequencer extends AsyncProcessorSupport try { Thread.sleep(getTimeout()); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we were interrupted so break out exchange.setException(e); callback.done(true); @@ -303,6 +304,7 @@ public class StreamResequencer extends AsyncProcessorSupport deliveryRequestLock.unlock(); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); break; } try { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index d97227d0d65..e278a5db1e4 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -1703,6 +1703,7 @@ public class AggregateProcessor extends AsyncProcessorSupport try { Thread.sleep(100); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // break out as we got interrupted such as the JVM terminating LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges()); break; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 60063459add..21c66770330 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -827,6 +827,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // as we do not want to continue routing (for example a task has been cancelled) exchange.setRouteStop(true); reactiveExecutor.schedule(callback); + + Thread.currentThread().interrupt(); } } } else {