This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0452716590 ARTEMIS-5678 Add additional bridge tests
0452716590 is described below
commit 0452716590ba78a30fc87628b314cee44e62d4a1
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Sep 23 10:14:25 2025 -0400
ARTEMIS-5678 Add additional bridge tests
Adds some tests between two server instances using bridge from and bridge to
policies to drain off messages from a durable subscription to a matching sub
on the other broker.
---
.../amqp/connect/AMQPBridgeServerToServerTest.java | 223 +++++++++++++++++++++
1 file changed, 223 insertions(+)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
index ca32d671ac..4d9ecc8e9a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -1383,4 +1384,226 @@ class AMQPBridgeServerToServerTest extends
AmqpClientTestSupport {
assertEquals("red", receiveAnother.getStringProperty("color"));
}
}
+
+ @Test
+ @Timeout(20)
+ public void testBrigeToFQQNUsedToDrainDurableConsumerSubscriptionQueue()
throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final long MESSAGE_COUNT = 10;
+
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+ try (Connection connection = factoryLocal.createConnection()) {
+ connection.setClientID("test-brigdes");
+ connection.start();
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageConsumer consumer = session.createDurableConsumer(topic,
"bridge-sub");
+ final MessageProducer producer = session.createProducer(topic);
+
+ consumer.close();
+
+ for (int i = 0; i < MESSAGE_COUNT; ++i) {
+ final TextMessage message = session.createTextMessage("Message:" +
i);
+
+ message.setStringProperty("color", "red");
+
+ producer.send(message);
+ }
+ }
+
+ final String subscriptionQueueName =
server.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+ assertNotNull(subscriptionQueueName);
+
+ final org.apache.activemq.artemis.core.server.Queue subscriptionQueue =
server.locateQueue(subscriptionQueueName);
+
+ assertNotNull(subscriptionQueue);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+ Wait.assertEquals(MESSAGE_COUNT, () ->
subscriptionQueue.getMessageCount(), 5_000, 100);
+ assertTrue(subscriptionQueue.isDurable());
+
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connection = factoryRemote.createConnection()) {
+ connection.setClientID("test-brigdes");
+ connection.start();
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageConsumer consumer = session.createDurableConsumer(topic,
"bridge-sub");
+
+ consumer.close();
+ }
+
+ final String remoteSubscriptionQueueName =
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+ assertNotNull(remoteSubscriptionQueueName);
+
+ final org.apache.activemq.artemis.core.server.Queue
remoteSubscriptionQueue = remoteServer.locateQueue(subscriptionQueueName);
+
+ assertNotNull(remoteSubscriptionQueue);
+ Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getConsumerCount(),
5_000, 100);
+ Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getMessageCount(),
5_000, 100);
+ assertTrue(remoteSubscriptionQueue.isDurable());
+
+ assertEquals(subscriptionQueueName, remoteSubscriptionQueueName);
+
+ final AMQPBridgeQueuePolicyElement bridgePolicy = new
AMQPBridgeQueuePolicyElement();
+ bridgePolicy.setName("test-policy");
+ bridgePolicy.addToIncludes(getTestName(), subscriptionQueueName);
+ bridgePolicy.setRemoteAddress(getTestName() + "::" +
subscriptionQueueName); // Direct the policy on where to put the messages
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeToQueuePolicy(bridgePolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10); // Limit reconnects
+ amqpConnection.setRetryInterval(50);
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().getAMQPConnection().clear();
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+ protocolFactory.updateProtocolServices(server, new ArrayList<>());
+
+ Wait.assertEquals(MESSAGE_COUNT, () ->
remoteSubscriptionQueue.getMessageCount(), 5_000, 100);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000,
100);
+
+ try (Connection connection = factoryRemote.createConnection()) {
+ connection.setClientID("test-brigdes");
+ connection.start();
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageConsumer consumer = session.createDurableConsumer(topic,
"bridge-sub");
+
+ for (int i = 0; i < MESSAGE_COUNT; ++i) {
+ final TextMessage message = (TextMessage) consumer.receive(100);
+
+ assertEquals("Message:" + i, message.getText());
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testBrigeFromQueueUsedToDrainDurableConsumerSubscriptionQueue()
throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final long MESSAGE_COUNT = 10;
+
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connection = factoryRemote.createConnection()) {
+ connection.setClientID("test-brigdes");
+ connection.start();
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageConsumer consumer = session.createDurableConsumer(topic,
"bridge-sub");
+ final MessageProducer producer = session.createProducer(topic);
+
+ consumer.close();
+
+ for (int i = 0; i < MESSAGE_COUNT; ++i) {
+ final TextMessage message = session.createTextMessage("Message:" +
i);
+
+ message.setStringProperty("color", "red");
+
+ producer.send(message);
+ }
+ }
+
+ final String remoteSubscriptionQueueName =
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+ assertNotNull(remoteSubscriptionQueueName);
+
+ final org.apache.activemq.artemis.core.server.Queue
remoteSubscriptionQueue = remoteServer.locateQueue(remoteSubscriptionQueueName);
+
+ assertNotNull(remoteSubscriptionQueue);
+ Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getConsumerCount(),
5_000, 100);
+ Wait.assertEquals(MESSAGE_COUNT, () ->
remoteSubscriptionQueue.getMessageCount(), 5_000, 100);
+ assertTrue(remoteSubscriptionQueue.isDurable());
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+ try (Connection connection = factoryLocal.createConnection()) {
+ connection.setClientID("test-brigdes");
+ connection.start();
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageConsumer consumer = session.createDurableConsumer(topic,
"bridge-sub");
+
+ consumer.close();
+ }
+
+ final String subscriptionQueueName =
server.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+ assertNotNull(subscriptionQueueName);
+
+ final org.apache.activemq.artemis.core.server.Queue subscriptionQueue =
server.locateQueue(subscriptionQueueName);
+
+ assertNotNull(subscriptionQueue);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000,
100);
+ assertTrue(subscriptionQueue.isDurable());
+
+ assertEquals(subscriptionQueueName, subscriptionQueueName);
+
+ final AMQPBridgeQueuePolicyElement bridgePolicy = new
AMQPBridgeQueuePolicyElement();
+ bridgePolicy.setName("test-policy");
+ bridgePolicy.addToIncludes(getTestName(), subscriptionQueueName);
+ bridgePolicy.setRemoteAddress(getTestName() + "::" +
subscriptionQueueName); // Direct the policy on where to get the messages
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromQueuePolicy(bridgePolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10); // Limit reconnects
+ amqpConnection.setRetryInterval(50);
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().getAMQPConnection().clear();
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+ protocolFactory.updateProtocolServices(server, new ArrayList<>());
+
+ try (Connection connection = factoryLocal.createConnection()) {
+ connection.setClientID("test-brigdes");
+ connection.start();
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageConsumer consumer = session.createDurableConsumer(topic,
"bridge-sub");
+
+ Wait.assertEquals(MESSAGE_COUNT, () ->
subscriptionQueue.getDeliveringCount(), 5_000, 100);
+ Wait.assertEquals(0L, () ->
remoteSubscriptionQueue.getMessageCount(), 5_000, 100);
+
+ for (int i = 0; i < MESSAGE_COUNT; ++i) {
+ final TextMessage message = (TextMessage) consumer.receive(100);
+
+ assertEquals("Message:" + i, message.getText());
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact