This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bc3d93ed4 ARTEMIS-5627 Disconnect consumers during a Mirror ACK Retry
3bc3d93ed4 is described below

commit 3bc3d93ed404c325d97d8c8ff8522082ebf63317
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Aug 20 13:39:47 2025 -0400

    ARTEMIS-5627 Disconnect consumers during a Mirror ACK Retry
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   6 +
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   5 +
 .../connect/mirror/AMQPMirrorControllerSource.java |  10 +-
 .../connect/mirror/AMQPMirrorControllerTarget.java |   1 +
 .../protocol/amqp/connect/mirror/AckManager.java   |  39 ++++-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |   8 +
 .../core/protocol/openwire/amq/AMQSession.java     |   7 +
 .../artemis/core/protocol/stomp/StompSession.java  |   7 +
 .../artemis/core/config/Configuration.java         |   8 +
 .../core/config/impl/ConfigurationImpl.java        |  13 ++
 .../deployers/impl/FileConfigurationParser.java    |   3 +
 .../impl/ManagementRemotingConnection.java         |   5 +
 .../protocol/core/impl/CoreSessionCallback.java    |   8 +
 .../activemq/artemis/core/server/Consumer.java     |   7 +
 .../apache/activemq/artemis/core/server/Queue.java |   3 +
 .../artemis/core/server/impl/QueueImpl.java        |   5 +
 .../core/server/impl/ServerConsumerImpl.java       |   5 +
 .../artemis/spi/core/protocol/SessionCallback.java |   2 +
 .../resources/schema/artemis-configuration.xsd     |   8 +
 .../core/config/impl/FileConfigurationTest.java    |   1 +
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 .../ConfigurationTest-xinclude-config.xml          |   1 +
 .../ConfigurationTest-xinclude-schema-config.xml   |   1 +
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   1 -
 .../amqp/connect/DisconnectConsumerMirrorTest.java | 163 +++++++++++++++++++++
 .../tests/integration/cli/DummyServerConsumer.java |   1 -
 .../tests/integration/client/HangConsumerTest.java |   5 +
 .../mirror/LargeAccumulationTest.java              | 116 +++++++--------
 28 files changed, 365 insertions(+), 75 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index c7f7af4ec5..353ad90d9f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -720,6 +720,8 @@ public final class ActiveMQDefaultConfiguration {
    private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED = 
false;
    private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
 
+   private static boolean DEFAULT_MIRROR_DISCONNECT_CONSUMERS = false;
+
    private static final boolean DEFAULT_PURGE_PAGE_FOLDERS = false;
 
    private static final int DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS = 30;
@@ -2012,6 +2014,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED;
    }
 
+   public static boolean getMirrorAckManagerDisconnectConsumers() {
+      return DEFAULT_MIRROR_DISCONNECT_CONSUMERS;
+   }
+
    public static int getMirrorAckManagerRetryDelay() {
       return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 988ebdd5dc..d2f87961a0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -789,6 +789,11 @@ public class AMQPSessionCallback implements 
SessionCallback {
       });
    }
 
+   @Override
+   public void failConnection(String errorMessage) {
+      serverSession.getRemotingConnection().fail(new 
ActiveMQException(errorMessage));
+   }
+
    @Override
    public boolean hasCredits(ServerConsumer consumer) {
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) 
consumer.getProtocolContext();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index fae26f0293..d7192d86ef 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -298,7 +298,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       }
       String remoteID = getRemoteMirrorId();
       if (remoteID == null) {
-         // This is to avoid a reflection (Miror sendin messages back to 
itself) from a small period of time one node reconnects but not the opposite 
direction.
+         // This is to avoid a reflection (Mirror sending messages back to 
itself) from a small period of time one node reconnects but not the opposite 
direction.
          Object localRemoteID = message.getAnnotation(BROKER_ID_SIMPLE_STRING);
          if (localRemoteID != null) {
             remoteID = String.valueOf(localRemoteID);
@@ -341,20 +341,22 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       SimpleString address = context.getAddress(message);
 
       if (context.isInternal()) {
-         logger.trace("sendMessage::server {} is discarding send to avoid 
sending to internal queue", server);
+         logger.trace("sendMessage::server {} is discarding send {} to avoid 
sending to internal queue", server, message);
          return;
       }
 
       if (invalidTarget(context.getMirrorSource(), message)) {
-         logger.trace("sendMessage::server {} is discarding send to avoid 
infinite loop (reflection with the mirror)", server);
+         logger.trace("sendMessage::server {} is discarding send {} to avoid 
infinite loop (reflection with the mirror)", server, message);
          return;
       }
 
       if (ignoreAddress(address)) {
-         logger.trace("sendMessage::server {} is discarding send to address 
{}, address doesn't match filter", server, address);
+         logger.trace("sendMessage::server {} is discarding send {} to address 
{}, address doesn't match filter", server, address, message);
          return;
       }
 
+      logger.trace("sendMessage::server {} is SENDING {}", server, message);
+
       try {
          context.setReusable(false);
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index e1e3020c55..485bb298c5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -517,6 +517,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
       if (internalMirrorID == null) {
          internalMirrorID = getRemoteMirrorId(); // not passing the ID means 
the data was generated on the remote broker
       }
+
       Long internalIDLong = (Long) 
deliveryAnnotations.getValue().get(INTERNAL_ID);
       String internalAddress = (String) 
deliveryAnnotations.getValue().get(INTERNAL_DESTINATION);
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index a7f1b8654f..747bc812dc 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -47,9 +47,12 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -255,7 +258,7 @@ public class AckManager implements ActiveMQComponent {
 
    // to be used with the same executor as the PagingStore executor
    public void retryAddress(SimpleString address, 
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
-
+      checkConsumers(address);
 
       // This is an optimization:
       // we peek at how many records we currently have. When we scan all the 
records that were initially input we would
@@ -379,7 +382,7 @@ public class AckManager implements ActiveMQComponent {
                }
             }
          } else {
-            logger.trace("Retry {} queue attempted {} times on paging, 
QueueAttempts {} Configuration Page Attempts={}", retry, 
retry.getQueueAttempts(), retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
+            logger.trace("Retry {} attempted {} times on paging, Configuration 
Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
          }
       }
    }
@@ -512,6 +515,36 @@ public class AckManager implements ActiveMQComponent {
       }
    }
 
+   private void checkConsumers(SimpleString address) {
+      if (configuration.isMirrorDisconnectConsumers()) {
+         try {
+            Bindings bindings = 
server.getPostOffice().getBindingsForAddress(address);
+            bindings.forEach((n, b) -> {
+               if (b instanceof LocalQueueBinding) {
+                  Queue queue = ((LocalQueueBinding) b).getQueue();
+                  checkConsumers(queue);
+               }
+            });
+         } catch (Exception e) {
+            // nothing that we can do here other than log
+            logger.warn(e.getMessage(), e);
+         }
+      }
+   }
+
+   private void checkConsumers(Queue queue) {
+      if (configuration.isMirrorDisconnectConsumers() && 
queue.getConsumerCount() > 0) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Disconnecting consumers on queue {}", 
queue.getName());
+         }
+         queue.forEachConsumer(this::failConsumer);
+      }
+   }
+
+   private void failConsumer(Consumer consumer) {
+      consumer.failConnection("Mirror requesting consumers away to perform 
proper ACK retries");
+   }
+
    public boolean ack(String nodeID, Queue targetQueue, long messageID, 
AckReason reason, boolean allowRetry) {
       if (logger.isTraceEnabled()) {
          logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}, 
allowRetry={})", nodeID, messageID, targetQueue.getName(), allowRetry);
@@ -525,6 +558,8 @@ public class AckManager implements ActiveMQComponent {
          }
 
          if (allowRetry) {
+            checkConsumers(targetQueue);
+
             if (configuration != null && 
configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount() 
> 0) {
                
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(),
 targetQueue.getName(), nodeID, messageID);
             } else {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 9b80d5b66f..c872e95ab0 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -44,6 +45,13 @@ public class MQTTSessionCallback implements SessionCallback {
       return connection.isWritable(callback);
    }
 
+
+   @Override
+   public void failConnection(String errorMessage) {
+      connection.fail(new ActiveMQException(errorMessage));
+   }
+
+
    @Override
    public int sendMessage(MessageReference ref,
                           ServerConsumer consumer,
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4b35b2e656..d44748b73f 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.AutoCreateResult;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -283,6 +284,12 @@ public class AMQSession implements SessionCallback {
       return connection.isWritable(callback);
    }
 
+   @Override
+   public void failConnection(String errorMessage) {
+      connection.fail(new ActiveMQException(errorMessage));
+   }
+
+
    @Override
    public void sendProducerCreditsMessage(int credits, SimpleString address) {
       // TODO Auto-generated method stub
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index dd85ac6bd2..bac854868b 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.channel.EventLoop;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
@@ -118,6 +119,12 @@ public class StompSession implements SessionCallback {
       return connection.isWritable(callback);
    }
 
+   @Override
+   public void failConnection(String errorMessage) {
+      connection.fail(new ActiveMQException(errorMessage));
+   }
+
+
    void setServerSession(ServerSession session) {
       this.session = session;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 1effd74b11..eb1b2c0a4c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1526,6 +1526,14 @@ public interface Configuration {
 
    boolean isMirrorAckManagerWarnUnacked();
 
+   /**
+    * Should Mirror disconnect consumers in order to clearn Acknowledgement 
retries.
+    * This is useful in situations where you want consumers connected to only 
one side of the Mirrors.
+    * */
+   Configuration setMirrorDisconnectConsumers(boolean disconnectConsumers);
+
+   boolean isMirrorDisconnectConsumers();
+
    /**
     *  Should the system remove page folders once destinations stop paging.
     *  Default is false, however future major versions will have this as true 
*/
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 6c5738a13b..72d2a2ed53 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -466,6 +466,8 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
 
    private boolean mirrorAckManagerWarnUnacked = 
ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked();
 
+   private boolean mirrorDisconnectConsumers = 
ActiveMQDefaultConfiguration.getMirrorAckManagerDisconnectConsumers();
+
    private int mirrorAckManagerRetryDelay = 
ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
 
    private boolean mirrorPageTransaction = 
ActiveMQDefaultConfiguration.getMirrorPageTransaction();
@@ -3392,6 +3394,17 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
       return this;
    }
 
+   @Override
+   public ConfigurationImpl setMirrorDisconnectConsumers(boolean 
disconnectConsumers) {
+      this.mirrorDisconnectConsumers = disconnectConsumers;
+      return this;
+   }
+
+   @Override
+   public boolean isMirrorDisconnectConsumers() {
+      return mirrorDisconnectConsumers;
+   }
+
    @Override
    public ConfigurationImpl setMirrorAckManagerQueueAttempts(int 
minQueueAttempts) {
       logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", 
minQueueAttempts);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8f100cf59a..dd250dc181 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -395,6 +395,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
    private static final String MIRROR_ACK_MANAGER_RETRY_DELAY = 
"mirror-ack-manager-retry-delay";
    private static final String MIRROR_ACK_MANAGER_WARN_UNACKED = 
"mirror-ack-manager-warn-unacked";
+   private static final String MIRROR_DISCONNECT_CONSUMERS = 
"mirror-disconnect-consumers";
 
    private static final String MIRROR_PAGE_TRANSACTION = 
"mirror-page-transaction";
 
@@ -889,6 +890,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       config.setMirrorAckManagerWarnUnacked(getBoolean(e, 
MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked()));
 
+      config.setMirrorDisconnectConsumers(getBoolean(e, 
MIRROR_DISCONNECT_CONSUMERS, config.isMirrorDisconnectConsumers()));
+
       parseAddressSettings(e, config);
 
       parseResourceLimits(e, config);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index b3487a54d0..c35187ccd1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -260,6 +260,11 @@ public class ManagementRemotingConnection implements 
RemotingConnection {
       public void disconnect(ServerConsumer consumerId, String message) {
       }
 
+      @Override
+      public void failConnection(String errorMessage) {
+
+      }
+
       @Override
       public boolean isWritable(ReadyListener callback, Object 
protocolContext) {
          return false;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 34eac07137..6cd9deb80d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -80,6 +81,13 @@ public final class CoreSessionCallback implements 
SessionCallback {
       return connection.isWritable(callback);
    }
 
+
+   @Override
+   public void failConnection(String errorMessage) {
+      connection.fail(new ActiveMQException(errorMessage));
+   }
+
+
    @Override
    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, 
MessageReference ref, boolean failed) {
       return false;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 79b97b0834..0666a494ec 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -84,6 +84,13 @@ public interface Consumer extends PriorityAware {
     */
    void disconnect();
 
+   /**
+    * disconnect the consumer
+    */
+   default void failConnection(String errorMessage) {
+   }
+
+
    void failed(Throwable t);
 
    /**
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 061a32927b..0da22f18b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -167,6 +167,9 @@ public interface Queue extends Bindable, CriticalComponent {
 
    void addConsumer(Consumer consumer) throws Exception;
 
+   default void forEachConsumer(java.util.function.Consumer<Consumer> 
callback) {
+   }
+
    void addLingerSession(String sessionId);
 
    void removeLingerSession(String sessionId);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index aefcba2564..6f88e57430 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1228,6 +1228,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
    }
 
+   @Override
+   public void forEachConsumer(java.util.function.Consumer<Consumer> callback) 
{
+      consumers.stream().forEach(t -> callback.accept(t.consumer));
+   }
+
    @Override
    public void addLingerSession(String sessionId) {
       lingerSessionIds.add(sessionId);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 167d9986aa..8a4d56987b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1166,6 +1166,11 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
       callback.disconnect(this, "Queue deleted: " + getQueue().getName());
    }
 
+   @Override
+   public void failConnection(String errorMessage) {
+      callback.failConnection(errorMessage);
+   }
+
    @Override
    public void failed(Throwable t) {
       try {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ad504739cf..75b81a8645 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -79,6 +79,8 @@ public interface SessionCallback {
 
    boolean isWritable(ReadyListener callback, Object protocolContext);
 
+   void failConnection(String errorMessage);
+
    /**
     * Some protocols (Openwire) needs a special message with the browser is 
finished.
     */
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5e87d2e00f..2e9baeb018 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -998,6 +998,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="mirror-disconnect-consumers" type="xsd:boolean" 
maxOccurs="1" minOccurs="0" default="false">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Should mirror disconnect consumers if needed to proceed with 
acks to minimize chances of a missing ack.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="suppress-session-notifications" type="xsd:boolean" 
default="false" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index b59bc1ccbb..9823853988 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -597,6 +597,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
 
       assertEquals(111, configInstance.getMirrorAckManagerQueueAttempts());
       assertTrue(configInstance.isMirrorAckManagerWarnUnacked());
+      assertTrue(configInstance.isMirrorDisconnectConsumers());
       assertEquals(222, configInstance.getMirrorAckManagerPageAttempts());
       assertEquals(333, configInstance.getMirrorAckManagerRetryDelay());
       assertTrue(configInstance.isMirrorPageTransaction());
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index b9935e85bb..45e9c42401 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -601,6 +601,7 @@
       <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
       <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
       <mirror-page-transaction>true</mirror-page-transaction>
+      <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
 
       <security-settings>
          <security-setting match="a1">
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 803eed9747..80436e0dc2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -79,6 +79,7 @@
       <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
       <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
       <mirror-page-transaction>true</mirror-page-transaction>
+      <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
 
       <remoting-incoming-interceptors>
          
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index eec3aae8e7..ad1bda4628 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -79,6 +79,7 @@
       <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
       <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
       <mirror-page-transaction>true</mirror-page-transaction>
+      <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
 
 
       <xi:include 
href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-incoming-interceptors.xml"/>
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 022293328a..1a6954e4a3 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -154,7 +154,6 @@ public class FakeConsumer implements Consumer {
 
    @Override
    public void disconnect() {
-      //To change body of implemented methods use File | Settings | File 
Templates.
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
new file mode 100644
index 0000000000..e968de79f0
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class DisconnectConsumerMirrorTest extends ActiveMQTestBase {
+
+   private static final int NUMBER_OF_MESSAGES = 5;
+
+   ActiveMQServer server1;
+   ActiveMQServer server2;
+
+   @Test
+   public void testDisconnectConsumers() throws Exception {
+
+      try {
+         String queueName = getName();
+
+         {
+            Configuration configuration = createDefaultConfig(0, false);
+            configuration.setMirrorDisconnectConsumers(true);
+            configuration.getAddressConfigurations().clear();
+            configuration.setResolveProtocols(true);
+            
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
+            
configuration.addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+            configuration.addAcceptorConfiguration("clients", 
"tcp://localhost:61616");
+            AMQPBrokerConnectConfiguration brokerConnectConfiguration = new 
AMQPBrokerConnectConfiguration("toDC2", 
"tcp://localhost:61617").setRetryInterval(100).setReconnectAttempts(-1);
+            AMQPMirrorBrokerConnectionElement mirror = new 
AMQPMirrorBrokerConnectionElement().setDurable(true);
+            brokerConnectConfiguration.addMirror(mirror);
+            configuration.addAMQPConnection(brokerConnectConfiguration);
+            server1 = createServer(true, configuration);
+            server1.setIdentity("server1");
+            server1.start();
+         }
+
+         {
+            Configuration configuration = createDefaultConfig(1, false);
+            configuration.setMirrorDisconnectConsumers(true);
+            configuration.setResolveProtocols(true);
+            
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
+            
configuration.addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+            configuration.addAcceptorConfiguration("clients", 
"tcp://localhost:61617");
+            AMQPBrokerConnectConfiguration brokerConnectConfiguration = new 
AMQPBrokerConnectConfiguration("toDC1", 
"tcp://localhost:61616").setRetryInterval(100).setReconnectAttempts(-1);
+            AMQPMirrorBrokerConnectionElement mirror = new 
AMQPMirrorBrokerConnectionElement();
+            brokerConnectConfiguration.addMirror(mirror);
+            configuration.addAMQPConnection(brokerConnectConfiguration);
+            server2 = createServer(true, configuration);
+            server2.setIdentity("server2");
+            server2.start();
+         }
+
+         validateProtocol("AMQP", queueName);
+         validateProtocol("CORE", queueName);
+         validateProtocol("OPENWIRE", queueName);
+      } finally {
+         server1.stop();
+         server2.stop();
+
+         server1 = null;
+         server2 = null;
+      }
+   }
+
+   private void validateProtocol(String protocol, String queueName) throws 
Exception {
+      Queue mirrorQueue2 = 
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC1");
+      assertNotNull(mirrorQueue2);
+
+      Queue mirrorQueue1 = 
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC2");
+      assertNotNull(mirrorQueue1);
+
+      ConnectionFactory factory1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      ConnectionFactory factory2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61617");
+
+      try (Connection connection = factory2.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+         }
+         session.commit();
+      }
+
+      try (Connection connection1 = factory1.createConnection(); Connection 
connection2 = factory2.createConnection()) {
+
+         connection1.start();
+         connection2.start();
+
+         Session session1 = connection1.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer_server1 = 
session1.createConsumer(session1.createQueue(queueName));
+         assertNotNull(consumer_server1.receive(5000));
+
+         Session session2 = connection2.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer_server2 = 
session2.createConsumer(session1.createQueue(queueName));
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            assertNotNull(consumer_server2.receive(5000));
+         }
+         session2.commit();
+
+         Wait.assertEquals(0, mirrorQueue1::getMessageCount);
+         Wait.assertEquals(0, mirrorQueue2::getMessageCount);
+
+         verifyNoMessages(server1, server2, queueName);
+
+         // Consumers on server1 were supposed to be disconnected
+         // as instructed on MirrorDisconnectConsumers
+         Assertions.assertThrows(JMSException.class, () -> {
+            consumer_server1.receive(5000);
+         });
+      }
+   }
+
+   private void verifyNoMessages(ActiveMQServer server1,
+                                 ActiveMQServer server2,
+                                 String queueName) throws Exception {
+      Queue queueServer1 = server1.locateQueue(queueName);
+      Queue queueServer2 = server2.locateQueue(queueName);
+
+      Queue mirrorQueue1 = 
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC2");
+      Queue mirrorQueue2 = 
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC1");
+
+      Wait.assertEquals(0L, mirrorQueue1::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, mirrorQueue2::getMessageCount, 5000, 100);
+
+      Wait.assertEquals(0L, queueServer1::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, queueServer2::getMessageCount, 5000, 100);
+   }
+}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 6483f07ce9..b27519feb6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -249,7 +249,6 @@ public class DummyServerConsumer implements ServerConsumer {
 
    @Override
    public void disconnect() {
-
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 46901169b7..0c539622a3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -519,6 +519,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
          return targetCallback.sendLargeMessageContinuation(consumer, body, 
continues, requiresResponse);
       }
 
+      @Override
+      public void failConnection(String errorMessage) {
+         targetCallback.failConnection(errorMessage);
+      }
+
       @Override
       public void closed() {
          targetCallback.closed();
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
index 612460ab59..b46dbee66a 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
@@ -20,6 +20,7 @@ package 
org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -46,6 +47,7 @@ import org.apache.activemq.artemis.util.ServerUtil;
 import org.apache.activemq.artemis.utils.FileUtil;
 import org.apache.activemq.artemis.utils.TestParameters;
 import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -123,6 +125,7 @@ public class LargeAccumulationTest extends SoakTestBase {
       brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
       brokerProperties.put("AMQPConnections." + connectionName + 
".connectionElements.mirror.sync", "false");
       brokerProperties.put("largeMessageSync", "false");
+      brokerProperties.put("mirrorDisconnectConsumers", "true");
       brokerProperties.put("pageSyncTimeout", "" + 
TimeUnit.MILLISECONDS.toNanos(1));
       brokerProperties.put("messageExpiryScanPeriod", "-1");
 
@@ -154,7 +157,6 @@ public class LargeAccumulationTest extends SoakTestBase {
          + "logger.db1.level=DEBUG"));
    }
 
-
    @BeforeAll
    public static void createServers() throws Exception {
       createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
@@ -262,9 +264,6 @@ public class LargeAccumulationTest extends SoakTestBase {
    @Test
    public void testLargeAccumulation() throws Exception {
 
-      final boolean useTopic = true;
-      final boolean useQueue = true;
-
       AtomicInteger errors = new AtomicInteger(0);
 
       // producers will have 2 sets of producers (queue and topic)
@@ -297,70 +296,57 @@ public class LargeAccumulationTest extends SoakTestBase {
          }
       }
 
+      Connection connectionOnServer2 = cfs[1].createConnection();
+      runAfter(connectionOnServer2::close);
+
+      Session sessionOnSrv2 = connectionOnServer2.createSession(true, 
Session.SESSION_TRANSACTED);
+
+      MessageConsumer deadConsumer = 
sessionOnSrv2.createConsumer(sessionOnSrv2.createQueue(QUEUE_NAME));
+
       CountDownLatch doneTopic = null, doneQueue = null;
 
-      if (useTopic) {
-         doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic, 
"LargeMessageTopic");
-      }
-      if (useQueue) {
-         doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue, 
"LargeMessageQueue");
-      }
+      doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic, 
"LargeMessageTopic");
+      doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue, 
"LargeMessageQueue");
 
-      if (useTopic) {
-         assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
-      }
-      if (useQueue) {
-         assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
-      }
+      assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
 
       assertEquals(0, errors.get());
 
-      if (useTopic) {
-         doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic, 
"MediumMessageTopic");
-      }
-      if (useQueue) {
-         doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue, 
"MediumMessageQueue");
-      }
+      doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic, 
"MediumMessageTopic");
+      doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue, 
"MediumMessageQueue");
 
-      if (useTopic) {
-         assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
-      }
-      if (useQueue) {
-         assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
-      }
+      assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+      assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
       assertEquals(0, errors.get());
 
-      matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES + 
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, useTopic, useQueue, true);
+      matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES + 
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, true);
 
-      if (useQueue) {
-         doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
-         consume(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue, 
doneQueue);
-      }
-      if (useTopic) {
-         doneTopic = new CountDownLatch(NUMBER_OF_THREADS * 
NUMBER_OF_SUBSCRIPTIONS);
-         for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
-            consume(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i, 
largeTopic, doneTopic);
-         }
+      doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
+      consume(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue, 
doneQueue);
+      doneTopic = new CountDownLatch(NUMBER_OF_THREADS * 
NUMBER_OF_SUBSCRIPTIONS);
+      for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+         consume(service, errors, NUMBER_OF_THREADS, cfs[0], 
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i, 
largeTopic, doneTopic);
       }
 
-      if (useTopic) {
-         assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
-      }
+      assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
 
-      if (useQueue) {
-         assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
-      }
+      assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
 
       assertEquals(0, errors.get());
 
-      matchMessageCounts(sm, 0, useTopic, useQueue, true);
+      matchMessageCounts(sm, 0, true);
+
+      // deadConsumer was supposed to be disconnected from the ack manager
+      Assertions.assertThrows(JMSException.class, () -> {
+         deadConsumer.receive(5000);
+      });
+
+      connectionOnServer2.close();
+
    }
 
-   private boolean matchMessageCounts(SimpleManagement[] sm,
-                                      long numberOfMessages,
-                                      boolean useTopic,
-                                      boolean useQueue,
-                                      boolean useWait) throws Exception {
+   private boolean matchMessageCounts(SimpleManagement[] sm, long 
numberOfMessages, boolean useWait) throws Exception {
       for (SimpleManagement s : sm) {
          logger.debug("Checking counts on SNF for {}", s.getUri());
          if (useWait) {
@@ -371,29 +357,25 @@ public class LargeAccumulationTest extends SoakTestBase {
             }
          }
 
-         if (useTopic) {
-            for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
-               String subscriptionName = "sub_" + i + ":global";
-               logger.debug("Checking counts on {} on {}", subscriptionName, 
s.getUri());
-               if (useWait) {
-                  Wait.assertEquals(numberOfMessages, () -> 
s.getMessageCountOnQueue(subscriptionName), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
-               } else {
-                  if (s.getMessageCountOnQueue(subscriptionName) != 
numberOfMessages) {
-                     return false;
-                  }
-               }
-            }
-         }
-
-         if (useQueue) {
+         for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+            String subscriptionName = "sub_" + i + ":global";
+            logger.debug("Checking counts on {} on {}", subscriptionName, 
s.getUri());
             if (useWait) {
-               Wait.assertEquals(numberOfMessages, () -> 
s.getMessageCountOnQueue(QUEUE_NAME), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+               Wait.assertEquals(numberOfMessages, () -> 
s.getMessageCountOnQueue(subscriptionName), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
             } else {
-               if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
+               if (s.getMessageCountOnQueue(subscriptionName) != 
numberOfMessages) {
                   return false;
                }
             }
          }
+
+         if (useWait) {
+            Wait.assertEquals(numberOfMessages, () -> 
s.getMessageCountOnQueue(QUEUE_NAME), 
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+         } else {
+            if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
+               return false;
+            }
+         }
       }
       return true;
    }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to