This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 057975c13c ARTEMIS-5464 Simplifying ClusteredLargeMessageInterruptTest
057975c13c is described below

commit 057975c13c878302cbc2b3f77b6e9d81c3c6866c
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon May 5 15:10:43 2025 -0400

    ARTEMIS-5464 Simplifying ClusteredLargeMessageInterruptTest
---
 .../ClusteredLargeMessageInterruptTest.java        | 203 +++++++++------------
 1 file changed, 87 insertions(+), 116 deletions(-)

diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
index 514ac42b7f..72ffd6cf93 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -57,6 +57,9 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
    public static final String SERVER_NAME_0 = "lmbroker1";
    public static final String SERVER_NAME_1 = "lmbroker2";
 
+   int COMBINATION_FACTOR = 6; // update this if you change the logic on the 
starting loops
+   private static final String[] protocolList = new String[] {"CORE", "AMQP", 
"OPENWIRE"};
+
    @BeforeAll
    public static void createServers() throws Exception {
       {
@@ -64,7 +67,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
          deleteDirectory(serverLocation);
 
          HelperCreate cliCreateServer = helperCreate();
-         
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation).
+         
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setArtemisInstance(serverLocation).
             setConfiguration("./src/main/resources/servers/lmbroker1");
          cliCreateServer.setArgs("--java-options", 
"-Djava.rmi.server.hostname=localhost", "--clustered", "--static-cluster", 
"tcp://localhost:61716", "--queues", "ClusteredLargeMessageInterruptTest", 
"--name", "lmbroker1");
          cliCreateServer.createServer();
@@ -130,108 +133,89 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       return startServer(SERVER_NAME_1, 100, 30000);
    }
 
-   @Test
-   public void testLargeMessageAMQPTX() throws Throwable {
-      testInterrupt("AMQP", true);
-   }
-
-   @Test
-   public void testInterruptAMQPNonTX() throws Throwable {
-      testInterrupt("AMQP", false);
-   }
-
-   @Test
-   public void testInterruptCORETX() throws Throwable {
-      testInterrupt("CORE", true);
-   }
-
-   @Test
-   public void testInterruptOPENWIRETX() throws Throwable {
-      testInterrupt("OPENWIRE", true);
-   }
-
-   @Test
-   public void testInterruptCORENonTX() throws Throwable {
-      testInterrupt("CORE", false);
-   }
-
-   private CountDownLatch startSendingThreads(Executor executor, String 
protocol, int broker, int threads, boolean tx, String queueName) {
+   private CountDownLatch startSendingThreads(Executor executor, int broker, 
String queueName) {
       runningSend = true;
-      CountDownLatch done = new CountDownLatch(threads);
-
-      ConnectionFactory factory = createConnectionFactory(broker, protocol);
-      final CyclicBarrier startFlag = new CyclicBarrier(threads);
-
-      for (int i = 0; i < threads; i++) {
-         int threadID = i;
-         executor.execute(() -> {
-            int numberOfMessages = 0;
-            try {
-               Connection connection = factory.createConnection();
-               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
-
-               startFlag.await(10, TimeUnit.SECONDS);
-               while (runningSend) {
-                  producer.send(session.createTextMessage(largebody));
-                  if (tx) {
-                     session.commit();
-                  }
-                  if (numberOfMessages++ % 10 == 0) {
-                     logger.info("Sent {}", numberOfMessages);
+
+      CountDownLatch done = new CountDownLatch(COMBINATION_FACTOR);
+      final CyclicBarrier startFlag = new CyclicBarrier(COMBINATION_FACTOR);
+
+      int threadCounter = 0;
+      for (String protocol : protocolList) {
+         for (int i = 0; i <= 1; i++) {
+            boolean tx = i > 0;
+            int threadID = threadCounter++;
+            executor.execute(() -> {
+               ConnectionFactory factory = createConnectionFactory(broker, 
protocol);
+               int numberOfMessages = 0;
+               try {
+                  Connection connection = factory.createConnection();
+                  Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                  MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+
+                  startFlag.await(10, TimeUnit.SECONDS);
+                  while (runningSend) {
+                     producer.send(session.createTextMessage(largebody));
+                     if (tx) {
+                        session.commit();
+                     }
+                     if (numberOfMessages++ % 10 == 0) {
+                        logger.info("Sent {}", numberOfMessages);
+                     }
                   }
+               } catch (Exception e) {
+                  logger.info("Thread {} got an error {}", threadID, 
e.getMessage());
+               } finally {
+                  done.countDown();
+                  logger.info("CountDown:: current Count {}", done.getCount());
                }
-            } catch (Exception e) {
-               logger.info("Thread {} got an error {}", threadID, 
e.getMessage());
-            } finally {
-               done.countDown();
-               logger.info("CountDown:: current Count {}", done.getCount());
-            }
-         });
+            });
+         }
       }
 
       return done;
    }
 
 
-   private CountDownLatch startConsumingThreads(Executor executor, String 
protocol, int broker, int threads, boolean tx, String queueName) {
+   private CountDownLatch startConsumingThreads(Executor executor, int broker, 
String queueName) {
+      CountDownLatch done = new CountDownLatch(COMBINATION_FACTOR);
+      final CyclicBarrier startFlag = new CyclicBarrier(COMBINATION_FACTOR);
+
       runningConsumer = true;
-      CountDownLatch done = new CountDownLatch(threads);
-
-      ConnectionFactory factory = createConnectionFactory(broker, protocol);
-      final CyclicBarrier startFlag = new CyclicBarrier(threads);
-
-      for (int i = 0; i < threads; i++) {
-         executor.execute(() -> {
-            int numberOfMessages = 0;
-            try (Connection connection = factory.createConnection()) {
-               connection.start();
-               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-               MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
-
-               startFlag.await(10, TimeUnit.SECONDS);
-               while (runningConsumer) {
-                  TextMessage message = (TextMessage)consumer.receive(100);
-                  if (message != null) {
-                     if (!message.getText().startsWith(largebody)) {
-                        logger.warn("Body does not match!");
-                        errors.incrementAndGet();
-                     }
-                     if (tx) {
-                        session.commit();
-                     }
-                     if (numberOfMessages++ % 10 == 0) {
-                        logger.info("Received {}", numberOfMessages);
+      for (String protocol : protocolList) {
+         for (int i = 0; i <= 1; i++) {
+            boolean tx = i > 0;
+            executor.execute(() -> {
+               int numberOfMessages = 0;
+               ConnectionFactory factory = createConnectionFactory(broker, 
protocol);
+               try (Connection connection = factory.createConnection()) {
+                  connection.start();
+                  Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                  MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+
+                  startFlag.await(10, TimeUnit.SECONDS);
+                  while (runningConsumer) {
+                     TextMessage message = (TextMessage) consumer.receive(100);
+                     if (message != null) {
+                        if (!message.getText().startsWith(largebody)) {
+                           logger.warn("Body does not match!");
+                           errors.incrementAndGet();
+                        }
+                        if (tx) {
+                           session.commit();
+                        }
+                        if (numberOfMessages++ % 10 == 0) {
+                           logger.info("Received {}", numberOfMessages);
+                        }
                      }
                   }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               } finally {
+                  logger.info("Done sending");
+                  done.countDown();
                }
-            } catch (Exception e) {
-               logger.warn(e.getMessage(), e);
-            } finally {
-               logger.info("Done sending");
-               done.countDown();
-            }
-         });
+            });
+         }
       }
 
       return done;
@@ -241,22 +225,21 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
    // this test has sleeps as the test will send while still active
    // we keep sending all the time.. so the testInterruptLM acts like a 
controller telling the threads when to stop
-   private void testInterrupt(String protocol, boolean tx) throws Throwable {
-      final int SENDING_THREADS = 10;
-      final int CONSUMING_THREADS = 10;
+   @Test
+   public void testInterrupt() throws Throwable {
       final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
 
       String queueName = "ClusteredLargeMessageInterruptTest";
 
-      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+      ExecutorService executorService = 
Executors.newFixedThreadPool(COMBINATION_FACTOR * 2);
       runAfter(executorService::shutdownNow);
 
       File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
       File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
 
       {
-         CountDownLatch sendDone = startSendingThreads(executorService, 
protocol, 0, SENDING_THREADS, tx, queueName);
-         CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+         CountDownLatch sendDone = startSendingThreads(executorService, 0, 
queueName);
+         CountDownLatch receiverDone = startConsumingThreads(executorService, 
1, queueName);
 
          // let it producing for a while
          Thread.sleep(2000);
@@ -268,11 +251,11 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
          assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
          serverProcess = startServer0();
          runningConsumer = false;
-         assertTrue(receiverDone.await(1, TimeUnit.MINUTES));
+         assertTrue(receiverDone.await(5, TimeUnit.MINUTES));
 
          long timeout = System.currentTimeMillis() + 60_000;
 
-         ConnectionFactory factory = createConnectionFactory(1, protocol);
+         ConnectionFactory factory = createConnectionFactory(1, "CORE");
 
          // This will flush all messages, making sure everything is consumed.
          try (Connection connection = factory.createConnection()) {
@@ -300,35 +283,23 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       assertEquals(0, errors.get());
    }
 
-   @Test
-   public void testBridgeFailureAMQP() throws Throwable {
-      testInterruptFailOnBridge("AMQP", false);
-   }
-
-   @Test
-   public void testBridgeFailureCORE() throws Throwable {
-      testInterruptFailOnBridge("CORE", false);
-   }
-
    private void killProcess(Process process) throws Exception {
       process.destroyForcibly();
    }
 
-
    // this is a slight variation of testInterruptLM where I switch over 
consumers before killing the previous node
    // this is to force messages being redistributed and try to get the bridge 
to failure.
    // I could played with a parameter but ellected to copy instead for 
simplicity
-   private void testInterruptFailOnBridge(String protocol, boolean tx) throws 
Throwable {
-      final int SENDING_THREADS = 10;
-      final int CONSUMING_THREADS = 10;
+   @Test
+   public void testInterruptFailOnBridge() throws Throwable {
       final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
 
       String queueName = "ClusteredLargeMessageInterruptTest";
 
-      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+      ExecutorService executorService = 
Executors.newFixedThreadPool(COMBINATION_FACTOR * 2);
       runAfter(executorService::shutdownNow);
 
-      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
+      CountDownLatch sendDone = startSendingThreads(executorService, 0, 
queueName);
 
       Thread.sleep(2000);
 
@@ -338,8 +309,8 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
       assertTrue(sendDone.await(1, TimeUnit.MINUTES));
 
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+      sendDone = startSendingThreads(executorService, 1, queueName);
+      CountDownLatch receiverDone = startConsumingThreads(executorService, 1, 
queueName);
       killProcess(serverProcess);
       assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
       serverProcess = startServer0();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to