This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 53cc4633cab3867823202f68a783d3fcc4ac9c6a Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Jan 16 11:02:15 2024 +0100 CAMEL-20297 camel-core-processor: do not swallow interrupted exceptions --- .../java/org/apache/camel/processor/Throttler.java | 2 ++ .../camel/processor/ThrottlerMethodCallTest.java | 37 ++++++++++++++++++---- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java index edaf2f49ccd..2a7df9052be 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java @@ -173,6 +173,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa private static boolean handleInterrupt( Exchange exchange, AsyncCallback callback, InterruptedException e, boolean doneSync) { + Thread.currentThread().interrupt(); // determine if we can still run, or the camel context is forcing a shutdown boolean forceShutdown = exchange.getContext().getShutdownStrategy().isForceShutdown(); if (forceShutdown) { @@ -384,6 +385,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa // honours fairness setting return super.tryAcquire(0L, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return false; } } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java index ac51a51197d..16f59048b60 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java @@ -18,19 +18,28 @@ package org.apache.camel.processor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.spi.Registry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @DisabledOnOs(OS.WINDOWS) public class ThrottlerMethodCallTest extends ContextTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ThrottlerMethodCallTest.class); private static final int INTERVAL = 100; protected int messageCount = 10; + private MockEndpoint resultEndpoint; + private ExecutorService executor; @Override protected Registry createRegistry() throws Exception { @@ -43,21 +52,35 @@ public class ThrottlerMethodCallTest extends ContextTestSupport { return 3; } - @Test - public void testConfigurationWithMethodCallExpression() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + @BeforeEach + public void prepareTest() { + resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); resultEndpoint.expectedMessageCount(messageCount); - ExecutorService executor = Executors.newFixedThreadPool(messageCount); + executor = Executors.newFixedThreadPool(messageCount); + } + + @AfterEach + public void cleanupTest() throws InterruptedException { + executor.shutdown(); + if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { + LOG.warn("The tasks did not finish within the expected time"); + executor.shutdownNow(); + } + } + @Test + public void testConfigurationWithMethodCallExpression() { for (int i = 0; i < messageCount; i++) { executor.execute(() -> template.sendBody("direct:expressionMethod", "<message>payload</message>")); } // let's wait for the exchanges to arrive - resultEndpoint.assertIsSatisfied(); - - executor.shutdownNow(); + try { + resultEndpoint.assertIsSatisfied(); + } catch (InterruptedException e) { + Assertions.fail(e); + } } @Override