This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new ae372c3 Make camel-netty tests run in parallel ae372c3 is described below commit ae372c39a0807250ffc304bc86c57c8a64be7868 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue Mar 23 17:14:11 2021 +0100 Make camel-netty tests run in parallel --- components/camel-netty/pom.xml | 22 ++++-- .../camel/component/netty/BaseNettyTest.java | 21 ++--- .../netty/ErrorDuringGracefullShutdownTest.java | 3 + .../camel/component/netty/LogCaptureTest.java | 2 + .../camel/component/netty/NettyProxyTest.java | 17 ++-- ...ttyUseSharedWorkerThreadPoolManyRoutesTest.java | 20 ++++- .../netty/NettyUseSharedWorkerThreadPoolTest.java | 28 +++---- .../netty/UnsharableCodecsConflictsTest.java | 17 ++-- .../org/apache/camel/test/AvailablePortFinder.java | 91 ++++++++++++++++++++-- 9 files changed, 159 insertions(+), 62 deletions(-) diff --git a/components/camel-netty/pom.xml b/components/camel-netty/pom.xml index 5245142..a1a6169 100644 --- a/components/camel-netty/pom.xml +++ b/components/camel-netty/pom.xml @@ -32,6 +32,11 @@ <name>Camel :: Netty</name> <description>Camel Netty NIO based socket communication component</description> + <properties> + <camel.surefire.parallel>true</camel.surefire.parallel> + <io.netty.leakDetection.level>DISABLED</io.netty.leakDetection.level> + </properties> + <dependencies> <dependency> <groupId>org.apache.camel</groupId> @@ -134,20 +139,25 @@ </execution> </executions> </plugin> - <plugin> <artifactId>maven-surefire-plugin</artifactId> <configuration> - <forkCount>1</forkCount> - <reuseForks>false</reuseForks> - <forkedProcessTimeoutInSeconds>5000</forkedProcessTimeoutInSeconds> <systemPropertyVariables> - <!-- can use PARANOID for checking every access --> - <io.netty.leakDetection.level>ADVANCED</io.netty.leakDetection.level> + <io.netty.leakDetection.level>${io.netty.leakDetection.level}</io.netty.leakDetection.level> </systemPropertyVariables> </configuration> </plugin> </plugins> </build> + <profiles> + <profile> + <id>leaks</id> + <properties> + <camel.surefire.parallel>false</camel.surefire.parallel> + <io.netty.leakDetection.level>PARANOID</io.netty.leakDetection.level> + </properties> + </profile> + </profiles> + </project> diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java index 8d40832..7236730 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java @@ -28,6 +28,7 @@ import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.logging.log4j.core.LogEvent; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,19 +39,15 @@ public class BaseNettyTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(BaseNettyTest.class); - private static volatile int port; - - @BeforeAll - public static void initPort() throws Exception { - port = AvailablePortFinder.getNextAvailable(); - } + @RegisterExtension + protected AvailablePortFinder.Port port = AvailablePortFinder.find(); @BeforeAll public static void startLeakDetection() { System.setProperty("io.netty.leakDetection.maxRecords", "100"); System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly", "true"); System.setProperty("io.netty.leakDetection.targetRecords", "100"); - ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + LogCaptureAppender.reset(); } @AfterAll @@ -64,9 +61,8 @@ public class BaseNettyTest extends CamelTestSupport { String message = "Leaks detected while running tests: " + events; // Just write the message into log to help debug for (LogEvent event : events) { - LOG.info(event.getMessage().getFormattedMessage()); + LOG.info(event.getMessage().toString()); } - LogCaptureAppender.reset(); throw new AssertionError(message); } } @@ -87,13 +83,8 @@ public class BaseNettyTest extends CamelTestSupport { return prop; } - protected int getNextPort() { - port = AvailablePortFinder.getNextAvailable(); - return port; - } - protected int getPort() { - return port; + return port.getPort(); } protected String byteArrayToHex(byte[] bytes) { diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java index 6771cc5..0b31b10 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java @@ -21,13 +21,16 @@ import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.errorhandler.DefaultErrorHandler; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; import static org.assertj.core.api.Assertions.assertThat; /** * Regression test for CAMEL-9527 */ +@Isolated class ErrorDuringGracefullShutdownTest extends BaseNettyTest { + @Override protected RoutesBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java index e9656ff..40c3df8 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java @@ -20,6 +20,7 @@ import io.netty.util.ResourceLeakDetector; import io.netty.util.internal.logging.InternalLoggerFactory; import org.apache.camel.processor.errorhandler.DefaultErrorHandler; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -27,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** * This test ensures LogCaptureAppender is configured properly */ +@Isolated public class LogCaptureTest { @Test public void testCapture() { diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java index 3057d5f..2e8ad5e 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java @@ -17,7 +17,9 @@ package org.apache.camel.component.netty; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -26,8 +28,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; */ public class NettyProxyTest extends BaseNettyTest { - private int port1; - private int port2; + @RegisterExtension + protected AvailablePortFinder.Port port2 = AvailablePortFinder.find(); @Test public void testNettyProxy() throws Exception { @@ -35,7 +37,7 @@ public class NettyProxyTest extends BaseNettyTest { getMockEndpoint("mock:proxy").expectedBodiesReceived("Camel"); getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel"); - Object body = template.requestBody("netty:tcp://localhost:" + port1 + "?sync=true&textline=true", "Camel\n"); + Object body = template.requestBody("netty:tcp://localhost:" + port.getPort() + "?sync=true&textline=true", "Camel\n"); assertEquals("Bye Camel", body); assertMockEndpointsSatisfied(); @@ -46,15 +48,12 @@ public class NettyProxyTest extends BaseNettyTest { return new RouteBuilder() { @Override public void configure() throws Exception { - port1 = getPort(); - port2 = getNextPort(); - - fromF("netty:tcp://localhost:%s?sync=true&textline=true", port1) + fromF("netty:tcp://localhost:%s?sync=true&textline=true", port.getPort()) .to("mock:before") - .toF("netty:tcp://localhost:%s?sync=true&textline=true", port2) + .toF("netty:tcp://localhost:%s?sync=true&textline=true", port2.getPort()) .to("mock:after"); - fromF("netty:tcp://localhost:%s?sync=true&textline=true", port2) + fromF("netty:tcp://localhost:%s?sync=true&textline=true", port2.getPort()) .to("mock:proxy") .transform().simple("Bye ${body}\n"); } diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java index 09ea897..6c81982 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java @@ -19,6 +19,8 @@ package org.apache.camel.component.netty; import io.netty.channel.EventLoopGroup; import org.apache.camel.BindToRegistry; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -35,6 +37,7 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest @BindToRegistry("sharedBoss") private EventLoopGroup sharedWorkerGroup = new NettyServerBossPoolBuilder().withBossCount(20).build(); private int before; + protected AvailablePortFinder.Port[] ports; @Override protected boolean useJmx() { @@ -45,9 +48,22 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest @BeforeEach public void setUp() throws Exception { before = Thread.activeCount(); + ports = new AvailablePortFinder.Port[60]; + for (int i = 0; i < ports.length; i++) { + ports[i] = AvailablePortFinder.find(); + } super.setUp(); } + @Override + @AfterEach + public void tearDown() throws Exception { + super.tearDown(); + for (AvailablePortFinder.Port port : ports) { + port.release(); + } + } + @Test public void testSharedThreadPool() throws Exception { int delta = Thread.activeCount() - before; @@ -65,8 +81,8 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest @Override public void configure() throws Exception { - for (int i = 0; i < 60; i++) { - from("netty:tcp://localhost:" + getNextPort() + "?textline=true&sync=true&usingExecutorService=false" + for (AvailablePortFinder.Port port : ports) { + from("netty:tcp://localhost:" + port.getPort() + "?textline=true&sync=true&usingExecutorService=false" + "&bossGroup=#sharedBoss&workerGroup=#sharedWorker") .validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result") .transform(body().regexReplaceAll("Hello", "Bye")); diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java index 931ba90..8266abc 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java @@ -19,7 +19,9 @@ package org.apache.camel.component.netty; import io.netty.channel.EventLoopGroup; import org.apache.camel.BindToRegistry; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -31,9 +33,10 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { @BindToRegistry("sharedClientPool") private EventLoopGroup sharedWorkerClientGroup = new NettyWorkerPoolBuilder().withWorkerCount(3).withName("NettyClient").build(); - private int port; - private int port2; - private int port3; + @RegisterExtension + protected AvailablePortFinder.Port port2 = AvailablePortFinder.find(); + @RegisterExtension + protected AvailablePortFinder.Port port3 = AvailablePortFinder.find(); @Override protected boolean useJmx() { @@ -46,17 +49,20 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { for (int i = 0; i < 10; i++) { String reply = template.requestBody( - "netty:tcp://localhost:" + port + "?textline=true&sync=true&workerGroup=#sharedClientPool", "Hello World", + "netty:tcp://localhost:" + port.getPort() + "?textline=true&sync=true&workerGroup=#sharedClientPool", + "Hello World", String.class); assertEquals("Bye World", reply); reply = template.requestBody( - "netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerGroup=#sharedClientPool", "Hello Camel", + "netty:tcp://localhost:" + port2.getPort() + "?textline=true&sync=true&workerGroup=#sharedClientPool", + "Hello Camel", String.class); assertEquals("Hi Camel", reply); reply = template.requestBody( - "netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerGroup=#sharedClientPool", "Hello Claus", + "netty:tcp://localhost:" + port3.getPort() + "?textline=true&sync=true&workerGroup=#sharedClientPool", + "Hello Claus", String.class); assertEquals("Hej Claus", reply); } @@ -73,21 +79,17 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { @Override public void configure() throws Exception { - port = getPort(); - port2 = getNextPort(); - port3 = getNextPort(); - - from("netty:tcp://localhost:" + port + from("netty:tcp://localhost:" + port.getPort() + "?textline=true&sync=true&workerGroup=#sharedServerPool&usingExecutorService=false") .validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result") .transform(body().regexReplaceAll("Hello", "Bye")); - from("netty:tcp://localhost:" + port2 + from("netty:tcp://localhost:" + port2.getPort() + "?textline=true&sync=true&workerGroup=#sharedServerPool&usingExecutorService=false") .validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result") .transform(body().regexReplaceAll("Hello", "Hi")); - from("netty:tcp://localhost:" + port3 + from("netty:tcp://localhost:" + port3.getPort() + "?textline=true&sync=true&workerGroup=#sharedServerPool&usingExecutorService=false") .validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result") .transform(body().regexReplaceAll("Hello", "Hej")); diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java index a829ca3..884fbae 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java @@ -26,8 +26,10 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.util.IOHelper; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,8 +44,8 @@ public class UnsharableCodecsConflictsTest extends BaseNettyTest { private Processor processor = new P(); - private int port1; - private int port2; + @RegisterExtension + protected AvailablePortFinder.Port port2 = AvailablePortFinder.find(); @BindToRegistry("length-decoder") private ChannelHandlerFactory decoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); @@ -63,8 +65,8 @@ public class UnsharableCodecsConflictsTest extends BaseNettyTest { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived(new String(sPort2) + "9"); - Socket server1 = getSocket("localhost", port1); - Socket server2 = getSocket("localhost", port2); + Socket server1 = getSocket("localhost", port.getPort()); + Socket server2 = getSocket("localhost", port2.getPort()); try { sendSopBuffer(bodyPort2, server2); @@ -84,12 +86,9 @@ public class UnsharableCodecsConflictsTest extends BaseNettyTest { protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - port1 = getPort(); - port2 = getNextPort(); + from("netty:tcp://localhost:" + port.getPort() + "?decoders=#length-decoder&sync=false").process(processor); - from("netty:tcp://localhost:" + port1 + "?decoders=#length-decoder&sync=false").process(processor); - - from("netty:tcp://localhost:" + port2 + "?decoders=#length-decoder2&sync=false").process(processor) + from("netty:tcp://localhost:" + port2.getPort() + "?decoders=#length-decoder2&sync=false").process(processor) .to("mock:result"); } }; diff --git a/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java b/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java index 3adc24d..0073685 100644 --- a/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java +++ b/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java @@ -20,7 +20,11 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +35,25 @@ public final class AvailablePortFinder { private static final Logger LOG = LoggerFactory.getLogger(AvailablePortFinder.class); + private static final AvailablePortFinder INSTANCE = new AvailablePortFinder(); + + public interface Port extends AfterAllCallback, AutoCloseable { + int getPort(); + + void release(); + + default void afterAll(ExtensionContext context) throws Exception { + release(); + } + + @Override + default void close() { + release(); + } + } + + private final Map<Integer, Port> portMapping = new ConcurrentHashMap<>(); + /** * Creates a new instance. */ @@ -38,6 +61,61 @@ public final class AvailablePortFinder { // Do nothing } + public static Port find() { + return INSTANCE.findPort(); + } + + synchronized Port findPort() { + while (true) { + final int port = probePort(0); + Port p = new Port() { + @Override + public int getPort() { + return port; + } + + @Override + public void release() { + AvailablePortFinder.this.release(this); + } + }; + Port prv = INSTANCE.portMapping.putIfAbsent(port, p); + if (prv == null) { + return p; + } + } + } + + synchronized Port findPort(int fromPort, int toPort) { + for (int i = fromPort; i <= toPort; i++) { + try { + final int port = probePort(i); + Port p = new Port() { + @Override + public int getPort() { + return port; + } + + @Override + public void release() { + AvailablePortFinder.this.release(this); + } + }; + Port prv = INSTANCE.portMapping.putIfAbsent(port, p); + if (prv == null) { + return p; + } + } catch (IllegalStateException e) { + // do nothing, let's try the next port + } + } + throw new IllegalStateException("Cannot find free port"); + } + + synchronized void release(Port port) { + INSTANCE.portMapping.remove(port.getPort(), port); + } + /** * Gets the next available port. * @@ -45,7 +123,9 @@ public final class AvailablePortFinder { * @return the available port */ public static int getNextAvailable() { - return probePort(0); + try (Port port = INSTANCE.findPort()) { + return port.getPort(); + } } /** @@ -58,14 +138,9 @@ public final class AvailablePortFinder { * @return the available port */ public static int getNextAvailable(int fromPort, int toPort) { - for (int i = fromPort; i <= toPort; i++) { - try { - return probePort(i); - } catch (IllegalStateException e) { - // do nothing, let's try the next port - } + try (Port port = INSTANCE.findPort(fromPort, toPort)) { + return port.getPort(); } - throw new IllegalStateException("Cannot find free port"); } /**