This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch wiretap in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4ec85c5309cac2beca17455a5b352dbe2fcc62f9 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jun 21 16:39:17 2023 +0200 WireTapAbortPolicyTest is flaky --- .../camel/processor/WireTapAbortPolicyTest.java | 27 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java index 7eff82fdeab..0fbdc41cd8e 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java @@ -16,8 +16,11 @@ */ package org.apache.camel.processor; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; @@ -26,6 +29,7 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Isolated; @@ -36,6 +40,10 @@ import static org.junit.jupiter.api.Assertions.fail; */ @Isolated public class WireTapAbortPolicyTest extends ContextTestSupport { + + final CountDownLatch latch = new CountDownLatch(1); + final CyclicBarrier barrier = new CyclicBarrier(2); + protected MockEndpoint tap; protected MockEndpoint result; protected ExecutorService pool; @@ -50,6 +58,7 @@ public class WireTapAbortPolicyTest extends ContextTestSupport { } @Test + @RepeatedTest(value = 1000) public void testSend() throws Exception { // hello must come first, as we have delay on the tapped route result.expectedMinimumMessageCount(2); @@ -62,6 +71,8 @@ public class WireTapAbortPolicyTest extends ContextTestSupport { fail("Task should be rejected"); } catch (Exception e) { assertIsInstanceOf(RejectedExecutionException.class, e.getCause()); + } finally { + latch.countDown(); } assertMockEndpointsSatisfied(); @@ -79,13 +90,14 @@ public class WireTapAbortPolicyTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() throws Exception { + // START SNIPPET: e1 // use a custom thread pool for sending tapped messages ExecutorService pool = new ThreadPoolBuilder(context) - // only allow 1 thread and 1 pending task - .poolSize(1) - .maxPoolSize(1) - .maxQueueSize(1) + // only allow 2 threads + .poolSize(2) + .maxPoolSize(2) + .maxQueueSize(0) // and about tasks .rejectedPolicy(ThreadPoolRejectedPolicy.Abort) .build(); @@ -95,7 +107,12 @@ public class WireTapAbortPolicyTest extends ContextTestSupport { .wireTap("direct:tap").executorService(pool).to("mock:result"); // END SNIPPET: e1 - from("direct:tap").delay(1000).to("mock:tap"); + from("direct:tap") + .process(e -> { + barrier.await(5, TimeUnit.SECONDS); + }) + .process(e -> latch.await(5, TimeUnit.SECONDS)) + .to("mock:tap"); } }; }