This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8a9128a147 ARTEMIS-5069 / ARTEMIS-5068 Temporary queues are going
through mirroring
8a9128a147 is described below
commit 8a9128a147b969b103e2ed40bfd1e0133868c047
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 18 14:58:05 2026 -0400
ARTEMIS-5069 / ARTEMIS-5068 Temporary queues are going through mirroring
Mirroring should ignore send / create / delete / acks for temporary
queues
---
.../mirror/AMQPMirrorControllerAggregation.java | 4 +-
.../connect/mirror/AMQPMirrorControllerSource.java | 27 +++-
.../connect/mirror/AMQPMirrorControllerTarget.java | 4 +-
.../protocol/amqp/connect/mirror/AckManager.java | 2 +-
.../artemis/core/server/RoutingContext.java | 4 +-
.../core/server/impl/ActiveMQServerImpl.java | 2 +-
.../core/server/impl/RoutingContextImpl.java | 17 +--
.../core/server/mirror/MirrorController.java | 2 +-
.../core/server/impl/RoutingContextTest.java | 16 +--
.../integration/amqp/connect/BrokerInSyncTest.java | 156 +++++++++++++++++----
.../amqp/connect/StopDuringMirrorTest.java | 4 +-
11 files changed, 180 insertions(+), 58 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
index b86318dc05..b208be947b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
@@ -105,9 +105,9 @@ public class AMQPMirrorControllerAggregation implements
MirrorController, Active
}
@Override
- public void deleteQueue(SimpleString addressName, SimpleString queueName)
throws Exception {
+ public void deleteQueue(SimpleString addressName, SimpleString queueName,
QueueConfiguration queueConfiguration) throws Exception {
for (MirrorController partition : partitions) {
- partition.deleteQueue(addressName, queueName);
+ partition.deleteQueue(addressName, queueName, queueConfiguration);
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 9bd23b5b9c..b9e018c6e4 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -238,10 +238,14 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
logger.trace("{} deleteAddress {}", server, addressInfo);
- if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
+ if (invalidTarget(getControllerInUse()) || addressInfo.isInternal() ||
addressInfo.isTemporary()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("ignoring deleteAddress for invalidTarget = {},
isInternal = {}, isTemporary = {}", invalidTarget(getControllerInUse()),
addressInfo.isInternal(), addressInfo.isTemporary());
+ }
return;
}
if (ignoreAddress(addressInfo.getName())) {
+ logger.trace("ignoring deleteAddress {} for ignoreAddress condition",
addressInfo.getName());
return;
}
if (deleteQueues) {
@@ -282,7 +286,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
@Override
- public void deleteQueue(SimpleString address, SimpleString queue) throws
Exception {
+ public void deleteQueue(SimpleString address, SimpleString queue,
QueueConfiguration queueConfiguration) throws Exception {
if (!brokerConnection.isEnabled()) {
return;
}
@@ -298,6 +302,15 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
return;
}
+ if (queueConfiguration != null) {
+ if (queueConfiguration.isTemporary() ||
queueConfiguration.isInternal()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("deleteQueue {}/{} ignored for isTemporary = {} or
isInternal = {}", address, queue, queueConfiguration.isTemporary(),
queueConfiguration.isInternal());
+ }
+ return;
+ }
+ }
+
if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null,
queue.toString());
routeMirrorCommand(server, message);
@@ -355,8 +368,8 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
SimpleString address = context.getAddress(message);
- if (context.isInternal()) {
- logger.trace("sendMessage::server {} is discarding send to avoid
sending to internal queue", server);
+ if (context.isMirrorIgnore()) {
+ logger.trace("sendMessage::server {} is discarding send to avoid
sending to internal or temporary queue", server);
return;
}
@@ -587,9 +600,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
return;
}
- if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
- if (logger.isDebugEnabled()) {
- logger.debug("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isTemporary() || ref.getQueue().isMirrorController()))) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("ignoring preAcknowledge on ref {} for either
internalQueue = {}, temporary = {}, isMirrorController = {}", ref,
ref.getQueue().isInternalQueue(), ref.getQueue().isTemporary(),
ref.getQueue().isMirrorController());
}
return;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index e1e3020c55..f8ccb77bed 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -318,7 +318,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
String address = (String)
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, ADDRESS);
String queueName = (String)
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE);
- deleteQueue(SimpleString.of(address),
SimpleString.of(queueName));
+ deleteQueue(SimpleString.of(address),
SimpleString.of(queueName), null);
} else if (eventType.equals(POST_ACK)) {
String nodeID = (String)
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID);
@@ -440,7 +440,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
@Override
- public void deleteQueue(SimpleString addressName, SimpleString queueName)
throws Exception {
+ public void deleteQueue(SimpleString addressName, SimpleString queueName,
QueueConfiguration configuration) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("{} destroy queue {} on address = {} server {}", server,
queueName, addressName, server.getIdentity());
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index a7f1b8654f..d26747fd20 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -640,7 +640,7 @@ public class AckManager implements ActiveMQComponent {
}
@Override
- public void deleteQueue(SimpleString addressName, SimpleString
queueName) throws Exception {
+ public void deleteQueue(SimpleString addressName, SimpleString
queueName, QueueConfiguration configuration) throws Exception {
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 44101dc8f6..fe9c4f3f1f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -54,9 +54,9 @@ public interface RoutingContext {
boolean isMirrorIndividualRoute();
/**
- * return true if every queue routed is internal
+ * return true if every queue routed is internal or temporary
*/
- boolean isInternal();
+ boolean isMirrorIgnore();
MirrorController getMirrorSource();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 875597cd36..c3db8beead 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2616,7 +2616,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
}
if (mirrorControllerService != null) {
- mirrorControllerService.deleteQueue(queue.getAddress(),
queue.getName());
+ mirrorControllerService.deleteQueue(queue.getAddress(),
queue.getName(), queue.getQueueConfiguration());
}
queue.deleteQueue(removeConsumers);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 3898248a0c..1f7a7a9dc6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -59,7 +59,7 @@ public class RoutingContextImpl implements RoutingContext {
Boolean reusable = null;
- Boolean internalOnly = null;
+ Boolean mirrorIgnore = null;
boolean divertDisabled = false;
@@ -130,10 +130,11 @@ public class RoutingContextImpl implements RoutingContext
{
}
@Override
- public boolean isInternal() {
- return internalOnly != null && internalOnly;
+ public boolean isMirrorIgnore() {
+ return mirrorIgnore != null && mirrorIgnore;
}
+
@Override
public int getPreviousBindingsVersion() {
return version;
@@ -177,7 +178,7 @@ public class RoutingContextImpl implements RoutingContext {
this.reusable = null;
- this.internalOnly = null;
+ this.mirrorIgnore = null;
// once we set to disabled, we keep it always disabled.
// This is because the routing object used to route commands will
disable this
@@ -211,11 +212,11 @@ public class RoutingContextImpl implements RoutingContext
{
listing.getNonDurableQueues().add(queue);
}
- if (internalOnly == null) {
- internalOnly = queue.isInternalQueue();
+ if (mirrorIgnore == null) {
+ mirrorIgnore = queue.isInternalQueue() || queue.isTemporary();
} else {
- // every queue added has to be internal only
- internalOnly = internalOnly && queue.isInternalQueue();
+ // making sure that every queue added matches the mirrorIgnore
+ mirrorIgnore = mirrorIgnore && (queue.isInternalQueue() ||
queue.isTemporary());
}
queueCount++;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
index fb194cca6e..d4a22763d1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
@@ -36,7 +36,7 @@ public interface MirrorController {
void addAddress(AddressInfo addressInfo) throws Exception;
void deleteAddress(AddressInfo addressInfo) throws Exception;
void createQueue(QueueConfiguration queueConfiguration) throws Exception;
- void deleteQueue(SimpleString addressName, SimpleString queueName) throws
Exception;
+ void deleteQueue(SimpleString addressName, SimpleString queueName,
QueueConfiguration queueConfiguration) throws Exception;
void sendMessage(Transaction tx, Message message, RoutingContext context);
void postAcknowledge(MessageReference ref, AckReason reason) throws
Exception;
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
index e3816a489f..f943650e96 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -949,28 +949,28 @@ public class RoutingContextTest {
@Test
public void testValidateInternal() {
RoutingContext context = new RoutingContextImpl(new TransactionImpl(new
NullStorageManager()));
- assertFalse(context.isInternal());
+ assertFalse(context.isMirrorIgnore());
context.addQueue(SimpleString.of("t1"), new
FakeQueueForRoutingContextTest("t1", true, true));
- assertTrue(context.isInternal());
+ assertTrue(context.isMirrorIgnore());
context.addQueue(SimpleString.of("t2"), new
FakeQueueForRoutingContextTest("t2", false, true));
- assertFalse(context.isInternal());
+ assertFalse(context.isMirrorIgnore());
context.addQueue(SimpleString.of("t3"), new
FakeQueueForRoutingContextTest("t3", true, true));
- assertFalse(context.isInternal());
+ assertFalse(context.isMirrorIgnore());
context.clear();
- assertFalse(context.isInternal());
+ assertFalse(context.isMirrorIgnore());
context.addQueue(SimpleString.of("t1"), new
FakeQueueForRoutingContextTest("t1", true, true));
- assertTrue(context.isInternal());
+ assertTrue(context.isMirrorIgnore());
context.addQueue(SimpleString.of("t2"), new
FakeQueueForRoutingContextTest("t2", true, true));
- assertTrue(context.isInternal());
+ assertTrue(context.isMirrorIgnore());
context.addQueue(SimpleString.of("t3"), new
FakeQueueForRoutingContextTest("t3", true, true));
- assertTrue(context.isInternal());
+ assertTrue(context.isMirrorIgnore());
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
index 1696a108af..3f5f67efad 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
@@ -26,11 +26,13 @@ import static org.junit.jupiter.api.Assertions.fail;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import java.io.PrintStream;
import java.net.URI;
@@ -116,7 +118,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
public void testSyncOnCreateQueues() throws Exception {
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -126,7 +128,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -265,23 +267,129 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
session1.commit();
}
- try {
- connection1.close();
- } catch (Exception ignored) {
+ connection1.close();
+ connection2.close();
+
+ Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
+ Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
+
+ server_2.stop();
+ server.stop();
+ }
+
+ @Test
+ public void testNoTemporaryAddressesOrQueues() throws Exception {
+ final String snfOnServer1Name =
"$ACTIVEMQ_ARTEMIS_MIRROR_connectTowardsServer2";
+
+ server.getConfiguration().setAddressQueueScanPeriod(100);
+ server.setIdentity("Server1");
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
}
- try {
- connection2.close();
- } catch (Exception ignored) {
+ server.start();
+ server.addAddressInfo(new
AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+
server.createQueue(QueueConfiguration.of(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("Server2");
+
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
+ server_2.start();
+ server_2.addAddressInfo(new
AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+
server_2.createQueue(QueueConfiguration.of(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
- Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
- Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
+
+ Wait.waitFor(() -> server.locateQueue(snfOnServer1Name) != null);
+
+ ConnectionFactory factoryServer1 =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ // exercising a legal mirror operation to make sure things are working
properly
+ try (Connection connection = factoryServer1.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ producer.send(session.createMessage());
+ session.commit();
+ assertNotNull(consumer.receive(5000));
+ session.commit();
+ }
+
+ org.apache.activemq.artemis.core.server.Queue snfOnServer1 =
server.locateQueue(snfOnServer1Name);
+ Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100);
+
+
+ // stopping the server to let things accumulate so we can assert the SNF
+ server_2.stop();
+
+ tempAddressSendAndReceive(factoryServer1, snfOnServer1, true);
+ tempAddressSendAndReceive(factoryServer1, snfOnServer1, false);
server_2.stop();
server.stop();
}
+ private void tempAddressSendAndReceive(ConnectionFactory factoryServer1,
org.apache.activemq.artemis.core.server.Queue snfQueue, boolean useTopic)
throws Exception {
+ String addressName;
+ try (Connection connectionServer1 = factoryServer1.createConnection()) {
+ Session session = connectionServer1.createSession(true,
Session.SESSION_TRANSACTED);
+ Destination destination;
+
+ if (useTopic) {
+ TemporaryTopic topic = session.createTemporaryTopic();
+ destination = topic;
+ addressName = topic.getTopicName();
+ } else {
+ TemporaryQueue queue = session.createTemporaryQueue();
+ destination = queue;
+ addressName = queue.getQueueName();
+ }
+
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ connectionServer1.start();
+
+ MessageProducer producer = session.createProducer(destination);
+ producer.send(session.createTextMessage());
+ session.commit();
+
+ assertNotNull(consumer.receive(5000));
+
+ session.commit();
+
+ Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+
+ // stopping the server to validate things are not accumulating
+
+ for (int i = 0; i < 100; i++) {
+ // sends should not make into the SNF either
+ producer.send(session.createTextMessage());
+ }
+ session.commit();
+ // no temporary sends
+ Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+
+ for (int i = 0; i < 100; i++) {
+ assertNotNull(consumer.receive(5000));
+ }
+ session.commit();
+
+ // no temporary acks
+ Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+ }
+
+ Wait.assertTrue(() ->
server.getAddressInfo(SimpleString.of(addressName)) == null, 5000, 100);
+ Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+ }
private void checkProperties(Connection connection, javax.jms.Message
message) throws Exception {
try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
@@ -325,7 +433,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
private void internalExpiry(boolean useReaper) throws Exception {
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -429,7 +537,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
public void testDLA() throws Exception {
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -545,7 +653,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
public void testCreateInternalQueue() throws Exception {
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -567,7 +675,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -609,7 +717,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -636,7 +744,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -685,7 +793,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
int NUMBER_OF_MESSAGES = 100;
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -695,7 +803,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -803,7 +911,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
int NUMBER_OF_MESSAGES = 1;
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -813,7 +921,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport
{
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -911,7 +1019,7 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -921,7 +1029,7 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -998,7 +1106,7 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
String queueName = "testSyncLargeMessage";
server.setIdentity("Server1");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
@@ -1008,7 +1116,7 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
server_2.setIdentity("Server2");
{
- AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
index f4df4e7bf7..732a7f9c5c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
@@ -181,8 +181,8 @@ public class StopDuringMirrorTest extends ActiveMQTestBase {
}
@Override
- public void deleteQueue(SimpleString addressName, SimpleString
queueName) throws Exception {
- target.deleteQueue(addressName, queueName);
+ public void deleteQueue(SimpleString addressName, SimpleString
queueName, QueueConfiguration configuration) throws Exception {
+ target.deleteQueue(addressName, queueName, configuration);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]