This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 057975c13c ARTEMIS-5464 Simplifying ClusteredLargeMessageInterruptTest
057975c13c is described below
commit 057975c13c878302cbc2b3f77b6e9d81c3c6866c
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon May 5 15:10:43 2025 -0400
ARTEMIS-5464 Simplifying ClusteredLargeMessageInterruptTest
---
.../ClusteredLargeMessageInterruptTest.java | 203 +++++++++------------
1 file changed, 87 insertions(+), 116 deletions(-)
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
index 514ac42b7f..72ffd6cf93 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -57,6 +57,9 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
public static final String SERVER_NAME_0 = "lmbroker1";
public static final String SERVER_NAME_1 = "lmbroker2";
+ int COMBINATION_FACTOR = 6; // update this if you change the logic on the
starting loops
+ private static final String[] protocolList = new String[] {"CORE", "AMQP",
"OPENWIRE"};
+
@BeforeAll
public static void createServers() throws Exception {
{
@@ -64,7 +67,7 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = helperCreate();
-
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation).
+
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setArtemisInstance(serverLocation).
setConfiguration("./src/main/resources/servers/lmbroker1");
cliCreateServer.setArgs("--java-options",
"-Djava.rmi.server.hostname=localhost", "--clustered", "--static-cluster",
"tcp://localhost:61716", "--queues", "ClusteredLargeMessageInterruptTest",
"--name", "lmbroker1");
cliCreateServer.createServer();
@@ -130,108 +133,89 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
return startServer(SERVER_NAME_1, 100, 30000);
}
- @Test
- public void testLargeMessageAMQPTX() throws Throwable {
- testInterrupt("AMQP", true);
- }
-
- @Test
- public void testInterruptAMQPNonTX() throws Throwable {
- testInterrupt("AMQP", false);
- }
-
- @Test
- public void testInterruptCORETX() throws Throwable {
- testInterrupt("CORE", true);
- }
-
- @Test
- public void testInterruptOPENWIRETX() throws Throwable {
- testInterrupt("OPENWIRE", true);
- }
-
- @Test
- public void testInterruptCORENonTX() throws Throwable {
- testInterrupt("CORE", false);
- }
-
- private CountDownLatch startSendingThreads(Executor executor, String
protocol, int broker, int threads, boolean tx, String queueName) {
+ private CountDownLatch startSendingThreads(Executor executor, int broker,
String queueName) {
runningSend = true;
- CountDownLatch done = new CountDownLatch(threads);
-
- ConnectionFactory factory = createConnectionFactory(broker, protocol);
- final CyclicBarrier startFlag = new CyclicBarrier(threads);
-
- for (int i = 0; i < threads; i++) {
- int threadID = i;
- executor.execute(() -> {
- int numberOfMessages = 0;
- try {
- Connection connection = factory.createConnection();
- Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer =
session.createProducer(session.createQueue(queueName));
-
- startFlag.await(10, TimeUnit.SECONDS);
- while (runningSend) {
- producer.send(session.createTextMessage(largebody));
- if (tx) {
- session.commit();
- }
- if (numberOfMessages++ % 10 == 0) {
- logger.info("Sent {}", numberOfMessages);
+
+ CountDownLatch done = new CountDownLatch(COMBINATION_FACTOR);
+ final CyclicBarrier startFlag = new CyclicBarrier(COMBINATION_FACTOR);
+
+ int threadCounter = 0;
+ for (String protocol : protocolList) {
+ for (int i = 0; i <= 1; i++) {
+ boolean tx = i > 0;
+ int threadID = threadCounter++;
+ executor.execute(() -> {
+ ConnectionFactory factory = createConnectionFactory(broker,
protocol);
+ int numberOfMessages = 0;
+ try {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (runningSend) {
+ producer.send(session.createTextMessage(largebody));
+ if (tx) {
+ session.commit();
+ }
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Sent {}", numberOfMessages);
+ }
}
+ } catch (Exception e) {
+ logger.info("Thread {} got an error {}", threadID,
e.getMessage());
+ } finally {
+ done.countDown();
+ logger.info("CountDown:: current Count {}", done.getCount());
}
- } catch (Exception e) {
- logger.info("Thread {} got an error {}", threadID,
e.getMessage());
- } finally {
- done.countDown();
- logger.info("CountDown:: current Count {}", done.getCount());
- }
- });
+ });
+ }
}
return done;
}
- private CountDownLatch startConsumingThreads(Executor executor, String
protocol, int broker, int threads, boolean tx, String queueName) {
+ private CountDownLatch startConsumingThreads(Executor executor, int broker,
String queueName) {
+ CountDownLatch done = new CountDownLatch(COMBINATION_FACTOR);
+ final CyclicBarrier startFlag = new CyclicBarrier(COMBINATION_FACTOR);
+
runningConsumer = true;
- CountDownLatch done = new CountDownLatch(threads);
-
- ConnectionFactory factory = createConnectionFactory(broker, protocol);
- final CyclicBarrier startFlag = new CyclicBarrier(threads);
-
- for (int i = 0; i < threads; i++) {
- executor.execute(() -> {
- int numberOfMessages = 0;
- try (Connection connection = factory.createConnection()) {
- connection.start();
- Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
-
- startFlag.await(10, TimeUnit.SECONDS);
- while (runningConsumer) {
- TextMessage message = (TextMessage)consumer.receive(100);
- if (message != null) {
- if (!message.getText().startsWith(largebody)) {
- logger.warn("Body does not match!");
- errors.incrementAndGet();
- }
- if (tx) {
- session.commit();
- }
- if (numberOfMessages++ % 10 == 0) {
- logger.info("Received {}", numberOfMessages);
+ for (String protocol : protocolList) {
+ for (int i = 0; i <= 1; i++) {
+ boolean tx = i > 0;
+ executor.execute(() -> {
+ int numberOfMessages = 0;
+ ConnectionFactory factory = createConnectionFactory(broker,
protocol);
+ try (Connection connection = factory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (runningConsumer) {
+ TextMessage message = (TextMessage) consumer.receive(100);
+ if (message != null) {
+ if (!message.getText().startsWith(largebody)) {
+ logger.warn("Body does not match!");
+ errors.incrementAndGet();
+ }
+ if (tx) {
+ session.commit();
+ }
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Received {}", numberOfMessages);
+ }
}
}
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ logger.info("Done sending");
+ done.countDown();
}
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
- } finally {
- logger.info("Done sending");
- done.countDown();
- }
- });
+ });
+ }
}
return done;
@@ -241,22 +225,21 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
// this test has sleeps as the test will send while still active
// we keep sending all the time.. so the testInterruptLM acts like a
controller telling the threads when to stop
- private void testInterrupt(String protocol, boolean tx) throws Throwable {
- final int SENDING_THREADS = 10;
- final int CONSUMING_THREADS = 10;
+ @Test
+ public void testInterrupt() throws Throwable {
final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
String queueName = "ClusteredLargeMessageInterruptTest";
- ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+ ExecutorService executorService =
Executors.newFixedThreadPool(COMBINATION_FACTOR * 2);
runAfter(executorService::shutdownNow);
File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
{
- CountDownLatch sendDone = startSendingThreads(executorService,
protocol, 0, SENDING_THREADS, tx, queueName);
- CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+ CountDownLatch sendDone = startSendingThreads(executorService, 0,
queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
1, queueName);
// let it producing for a while
Thread.sleep(2000);
@@ -268,11 +251,11 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer0();
runningConsumer = false;
- assertTrue(receiverDone.await(1, TimeUnit.MINUTES));
+ assertTrue(receiverDone.await(5, TimeUnit.MINUTES));
long timeout = System.currentTimeMillis() + 60_000;
- ConnectionFactory factory = createConnectionFactory(1, protocol);
+ ConnectionFactory factory = createConnectionFactory(1, "CORE");
// This will flush all messages, making sure everything is consumed.
try (Connection connection = factory.createConnection()) {
@@ -300,35 +283,23 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
assertEquals(0, errors.get());
}
- @Test
- public void testBridgeFailureAMQP() throws Throwable {
- testInterruptFailOnBridge("AMQP", false);
- }
-
- @Test
- public void testBridgeFailureCORE() throws Throwable {
- testInterruptFailOnBridge("CORE", false);
- }
-
private void killProcess(Process process) throws Exception {
process.destroyForcibly();
}
-
// this is a slight variation of testInterruptLM where I switch over
consumers before killing the previous node
// this is to force messages being redistributed and try to get the bridge
to failure.
// I could played with a parameter but ellected to copy instead for
simplicity
- private void testInterruptFailOnBridge(String protocol, boolean tx) throws
Throwable {
- final int SENDING_THREADS = 10;
- final int CONSUMING_THREADS = 10;
+ @Test
+ public void testInterruptFailOnBridge() throws Throwable {
final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
String queueName = "ClusteredLargeMessageInterruptTest";
- ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+ ExecutorService executorService =
Executors.newFixedThreadPool(COMBINATION_FACTOR * 2);
runAfter(executorService::shutdownNow);
- CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
+ CountDownLatch sendDone = startSendingThreads(executorService, 0,
queueName);
Thread.sleep(2000);
@@ -338,8 +309,8 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
assertTrue(sendDone.await(1, TimeUnit.MINUTES));
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+ sendDone = startSendingThreads(executorService, 1, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService, 1,
queueName);
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer0();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact