This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a9128a147 ARTEMIS-5069 / ARTEMIS-5068 Temporary queues are going 
through mirroring
8a9128a147 is described below

commit 8a9128a147b969b103e2ed40bfd1e0133868c047
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 18 14:58:05 2026 -0400

    ARTEMIS-5069 / ARTEMIS-5068 Temporary queues are going through mirroring
    
    Mirroring should ignore send / create / delete / acks for temporary
    queues
---
 .../mirror/AMQPMirrorControllerAggregation.java    |   4 +-
 .../connect/mirror/AMQPMirrorControllerSource.java |  27 +++-
 .../connect/mirror/AMQPMirrorControllerTarget.java |   4 +-
 .../protocol/amqp/connect/mirror/AckManager.java   |   2 +-
 .../artemis/core/server/RoutingContext.java        |   4 +-
 .../core/server/impl/ActiveMQServerImpl.java       |   2 +-
 .../core/server/impl/RoutingContextImpl.java       |  17 +--
 .../core/server/mirror/MirrorController.java       |   2 +-
 .../core/server/impl/RoutingContextTest.java       |  16 +--
 .../integration/amqp/connect/BrokerInSyncTest.java | 156 +++++++++++++++++----
 .../amqp/connect/StopDuringMirrorTest.java         |   4 +-
 11 files changed, 180 insertions(+), 58 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
index b86318dc05..b208be947b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
@@ -105,9 +105,9 @@ public class AMQPMirrorControllerAggregation implements 
MirrorController, Active
    }
 
    @Override
-   public void deleteQueue(SimpleString addressName, SimpleString queueName) 
throws Exception {
+   public void deleteQueue(SimpleString addressName, SimpleString queueName, 
QueueConfiguration queueConfiguration) throws Exception {
       for (MirrorController partition : partitions) {
-         partition.deleteQueue(addressName, queueName);
+         partition.deleteQueue(addressName, queueName, queueConfiguration);
       }
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 9bd23b5b9c..b9e018c6e4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -238,10 +238,14 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       }
       logger.trace("{} deleteAddress {}", server, addressInfo);
 
-      if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
+      if (invalidTarget(getControllerInUse()) || addressInfo.isInternal() || 
addressInfo.isTemporary()) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("ignoring deleteAddress for invalidTarget = {}, 
isInternal = {}, isTemporary = {}", invalidTarget(getControllerInUse()), 
addressInfo.isInternal(), addressInfo.isTemporary());
+         }
          return;
       }
       if (ignoreAddress(addressInfo.getName())) {
+         logger.trace("ignoring deleteAddress {} for ignoreAddress condition", 
addressInfo.getName());
          return;
       }
       if (deleteQueues) {
@@ -282,7 +286,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    }
 
    @Override
-   public void deleteQueue(SimpleString address, SimpleString queue) throws 
Exception {
+   public void deleteQueue(SimpleString address, SimpleString queue, 
QueueConfiguration queueConfiguration) throws Exception {
       if (!brokerConnection.isEnabled()) {
          return;
       }
@@ -298,6 +302,15 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return;
       }
 
+      if (queueConfiguration != null) {
+         if (queueConfiguration.isTemporary() || 
queueConfiguration.isInternal()) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("deleteQueue {}/{} ignored for isTemporary = {} or 
isInternal = {}", address, queue, queueConfiguration.isTemporary(), 
queueConfiguration.isInternal());
+            }
+            return;
+         }
+      }
+
       if (deleteQueues) {
          Message message = createMessage(address, queue, DELETE_QUEUE, null, 
queue.toString());
          routeMirrorCommand(server, message);
@@ -355,8 +368,8 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       }
       SimpleString address = context.getAddress(message);
 
-      if (context.isInternal()) {
-         logger.trace("sendMessage::server {} is discarding send to avoid 
sending to internal queue", server);
+      if (context.isMirrorIgnore()) {
+         logger.trace("sendMessage::server {} is discarding send to avoid 
sending to internal or temporary queue", server);
          return;
       }
 
@@ -587,9 +600,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return;
       }
 
-      if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
-         if (logger.isDebugEnabled()) {
-            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+      if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isTemporary() || ref.getQueue().isMirrorController()))) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("ignoring preAcknowledge on ref {} for either 
internalQueue = {}, temporary = {}, isMirrorController = {}", ref, 
ref.getQueue().isInternalQueue(), ref.getQueue().isTemporary(), 
ref.getQueue().isMirrorController());
          }
          return;
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index e1e3020c55..f8ccb77bed 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -318,7 +318,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
                   String address = (String) 
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, ADDRESS);
                   String queueName = (String) 
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE);
 
-                  deleteQueue(SimpleString.of(address), 
SimpleString.of(queueName));
+                  deleteQueue(SimpleString.of(address), 
SimpleString.of(queueName), null);
                } else if (eventType.equals(POST_ACK)) {
                   String nodeID = (String) 
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID);
 
@@ -440,7 +440,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    }
 
    @Override
-   public void deleteQueue(SimpleString addressName, SimpleString queueName) 
throws Exception {
+   public void deleteQueue(SimpleString addressName, SimpleString queueName, 
QueueConfiguration configuration) throws Exception {
       if (logger.isDebugEnabled()) {
          logger.debug("{} destroy queue {} on address = {} server {}", server, 
queueName, addressName, server.getIdentity());
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index a7f1b8654f..d26747fd20 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -640,7 +640,7 @@ public class AckManager implements ActiveMQComponent {
       }
 
       @Override
-      public void deleteQueue(SimpleString addressName, SimpleString 
queueName) throws Exception {
+      public void deleteQueue(SimpleString addressName, SimpleString 
queueName, QueueConfiguration configuration) throws Exception {
 
       }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 44101dc8f6..fe9c4f3f1f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -54,9 +54,9 @@ public interface RoutingContext {
    boolean isMirrorIndividualRoute();
 
    /**
-    * return true if every queue routed is internal
+    * return true if every queue routed is internal or temporary
     */
-   boolean isInternal();
+   boolean isMirrorIgnore();
 
    MirrorController getMirrorSource();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 875597cd36..c3db8beead 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2616,7 +2616,7 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
          }
 
          if (mirrorControllerService != null) {
-            mirrorControllerService.deleteQueue(queue.getAddress(), 
queue.getName());
+            mirrorControllerService.deleteQueue(queue.getAddress(), 
queue.getName(), queue.getQueueConfiguration());
          }
 
          queue.deleteQueue(removeConsumers);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 3898248a0c..1f7a7a9dc6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -59,7 +59,7 @@ public class RoutingContextImpl implements RoutingContext {
 
    Boolean reusable = null;
 
-   Boolean internalOnly = null;
+   Boolean mirrorIgnore = null;
 
    boolean divertDisabled = false;
 
@@ -130,10 +130,11 @@ public class RoutingContextImpl implements RoutingContext 
{
    }
 
    @Override
-   public boolean isInternal() {
-      return internalOnly != null && internalOnly;
+   public boolean isMirrorIgnore() {
+      return mirrorIgnore != null && mirrorIgnore;
    }
 
+
    @Override
    public int getPreviousBindingsVersion() {
       return version;
@@ -177,7 +178,7 @@ public class RoutingContextImpl implements RoutingContext {
 
       this.reusable = null;
 
-      this.internalOnly = null;
+      this.mirrorIgnore = null;
 
       // once we set to disabled, we keep it always disabled.
       // This is because the routing object used to route commands will 
disable this
@@ -211,11 +212,11 @@ public class RoutingContextImpl implements RoutingContext 
{
          listing.getNonDurableQueues().add(queue);
       }
 
-      if (internalOnly == null) {
-         internalOnly = queue.isInternalQueue();
+      if (mirrorIgnore == null) {
+         mirrorIgnore = queue.isInternalQueue() || queue.isTemporary();
       } else {
-         // every queue added has to be internal only
-         internalOnly = internalOnly && queue.isInternalQueue();
+         // making sure that every queue added matches the mirrorIgnore
+         mirrorIgnore = mirrorIgnore && (queue.isInternalQueue() || 
queue.isTemporary());
       }
 
       queueCount++;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
index fb194cca6e..d4a22763d1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
@@ -36,7 +36,7 @@ public interface MirrorController {
    void addAddress(AddressInfo addressInfo) throws Exception;
    void deleteAddress(AddressInfo addressInfo) throws Exception;
    void createQueue(QueueConfiguration queueConfiguration) throws Exception;
-   void deleteQueue(SimpleString addressName, SimpleString queueName) throws 
Exception;
+   void deleteQueue(SimpleString addressName, SimpleString queueName, 
QueueConfiguration queueConfiguration) throws Exception;
    void sendMessage(Transaction tx, Message message, RoutingContext context);
 
    void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception;
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
index e3816a489f..f943650e96 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -949,28 +949,28 @@ public class RoutingContextTest {
    @Test
    public void testValidateInternal() {
       RoutingContext context = new RoutingContextImpl(new TransactionImpl(new 
NullStorageManager()));
-      assertFalse(context.isInternal());
+      assertFalse(context.isMirrorIgnore());
 
       context.addQueue(SimpleString.of("t1"), new 
FakeQueueForRoutingContextTest("t1", true, true));
-      assertTrue(context.isInternal());
+      assertTrue(context.isMirrorIgnore());
 
       context.addQueue(SimpleString.of("t2"), new 
FakeQueueForRoutingContextTest("t2", false, true));
-      assertFalse(context.isInternal());
+      assertFalse(context.isMirrorIgnore());
 
       context.addQueue(SimpleString.of("t3"), new 
FakeQueueForRoutingContextTest("t3", true, true));
-      assertFalse(context.isInternal());
+      assertFalse(context.isMirrorIgnore());
 
       context.clear();
-      assertFalse(context.isInternal());
+      assertFalse(context.isMirrorIgnore());
 
       context.addQueue(SimpleString.of("t1"), new 
FakeQueueForRoutingContextTest("t1", true, true));
-      assertTrue(context.isInternal());
+      assertTrue(context.isMirrorIgnore());
 
       context.addQueue(SimpleString.of("t2"), new 
FakeQueueForRoutingContextTest("t2", true, true));
-      assertTrue(context.isInternal());
+      assertTrue(context.isMirrorIgnore());
 
       context.addQueue(SimpleString.of("t3"), new 
FakeQueueForRoutingContextTest("t3", true, true));
-      assertTrue(context.isInternal());
+      assertTrue(context.isMirrorIgnore());
    }
 
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
index 1696a108af..3f5f67efad 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
@@ -26,11 +26,13 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import java.io.PrintStream;
 import java.net.URI;
@@ -116,7 +118,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
    public void testSyncOnCreateQueues() throws Exception {
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -126,7 +128,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -265,23 +267,129 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
          session1.commit();
       }
 
-      try {
-         connection1.close();
-      } catch (Exception ignored) {
+      connection1.close();
+      connection2.close();
+
+      Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
+
+      server_2.stop();
+      server.stop();
+   }
+
+   @Test
+   public void testNoTemporaryAddressesOrQueues() throws Exception {
+      final String snfOnServer1Name = 
"$ACTIVEMQ_ARTEMIS_MIRROR_connectTowardsServer2";
+
+      server.getConfiguration().setAddressQueueScanPeriod(100);
+      server.setIdentity("Server1");
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
       }
 
-      try {
-         connection2.close();
-      } catch (Exception ignored) {
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+      
server.createQueue(QueueConfiguration.of(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("Server2");
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
+      server_2.start();
+      server_2.addAddressInfo(new 
AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+      
server_2.createQueue(QueueConfiguration.of(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
 
-      Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
-      Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
+
+      Wait.waitFor(() -> server.locateQueue(snfOnServer1Name) != null);
+
+      ConnectionFactory factoryServer1 = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+      // exercising a legal mirror operation to make sure things are working 
properly
+      try (Connection connection = factoryServer1.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(getQueueName());
+         MessageProducer producer = session.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         producer.send(session.createMessage());
+         session.commit();
+         assertNotNull(consumer.receive(5000));
+         session.commit();
+      }
+
+      org.apache.activemq.artemis.core.server.Queue snfOnServer1 = 
server.locateQueue(snfOnServer1Name);
+      Wait.assertEquals(0L, snfOnServer1::getMessageCount, 5000, 100);
+
+
+      // stopping the server to let things accumulate so we can assert the SNF
+      server_2.stop();
+
+      tempAddressSendAndReceive(factoryServer1, snfOnServer1, true);
+      tempAddressSendAndReceive(factoryServer1, snfOnServer1, false);
 
       server_2.stop();
       server.stop();
    }
 
+   private void tempAddressSendAndReceive(ConnectionFactory factoryServer1, 
org.apache.activemq.artemis.core.server.Queue snfQueue, boolean useTopic) 
throws Exception {
+      String addressName;
+      try (Connection connectionServer1 = factoryServer1.createConnection()) {
+         Session session = connectionServer1.createSession(true, 
Session.SESSION_TRANSACTED);
+         Destination destination;
+
+         if (useTopic) {
+            TemporaryTopic topic = session.createTemporaryTopic();
+            destination = topic;
+            addressName = topic.getTopicName();
+         } else {
+            TemporaryQueue queue = session.createTemporaryQueue();
+            destination = queue;
+            addressName = queue.getQueueName();
+         }
+
+
+         MessageConsumer consumer = session.createConsumer(destination);
+
+         connectionServer1.start();
+
+         MessageProducer producer = session.createProducer(destination);
+         producer.send(session.createTextMessage());
+         session.commit();
+
+         assertNotNull(consumer.receive(5000));
+
+         session.commit();
+
+         Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+
+         // stopping the server to validate things are not accumulating
+
+         for (int i = 0; i < 100; i++) {
+            // sends should not make into the SNF either
+            producer.send(session.createTextMessage());
+         }
+         session.commit();
+         // no temporary sends
+         Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+
+         for (int i = 0; i < 100; i++) {
+            assertNotNull(consumer.receive(5000));
+         }
+         session.commit();
+
+         // no temporary acks
+         Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+      }
+
+      Wait.assertTrue(() -> 
server.getAddressInfo(SimpleString.of(addressName)) == null, 5000, 100);
+      Wait.assertEquals(0L, snfQueue::getMessageCount, 5000, 100);
+   }
 
    private void checkProperties(Connection connection, javax.jms.Message 
message) throws Exception {
       try (Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
@@ -325,7 +433,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
    private void internalExpiry(boolean useReaper) throws Exception {
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -429,7 +537,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
    public void testDLA() throws Exception {
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -545,7 +653,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
    public void testCreateInternalQueue() throws Exception {
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -567,7 +675,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -609,7 +717,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
 
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -636,7 +744,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -685,7 +793,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       int NUMBER_OF_MESSAGES = 100;
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -695,7 +803,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -803,7 +911,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       int NUMBER_OF_MESSAGES = 1;
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -813,7 +921,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport 
{
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -911,7 +1019,7 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       server.setIdentity("Server1");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -921,7 +1029,7 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -998,7 +1106,7 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       String queueName = "testSyncLargeMessage";
       server.setIdentity("Server1");
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server.getConfiguration().addAMQPConnection(amqpConnection);
       }
@@ -1008,7 +1116,7 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       server_2.setIdentity("Server2");
 
       {
-         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
          amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
          server_2.getConfiguration().addAMQPConnection(amqpConnection);
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
index f4df4e7bf7..732a7f9c5c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
@@ -181,8 +181,8 @@ public class StopDuringMirrorTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void deleteQueue(SimpleString addressName, SimpleString 
queueName) throws Exception {
-         target.deleteQueue(addressName, queueName);
+      public void deleteQueue(SimpleString addressName, SimpleString 
queueName, QueueConfiguration configuration) throws Exception {
+         target.deleteQueue(addressName, queueName, configuration);
       }
 
       @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to