This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3bc3d93ed4 ARTEMIS-5627 Disconnect consumers during a Mirror ACK Retry
3bc3d93ed4 is described below
commit 3bc3d93ed404c325d97d8c8ff8522082ebf63317
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Aug 20 13:39:47 2025 -0400
ARTEMIS-5627 Disconnect consumers during a Mirror ACK Retry
---
.../api/config/ActiveMQDefaultConfiguration.java | 6 +
.../protocol/amqp/broker/AMQPSessionCallback.java | 5 +
.../connect/mirror/AMQPMirrorControllerSource.java | 10 +-
.../connect/mirror/AMQPMirrorControllerTarget.java | 1 +
.../protocol/amqp/connect/mirror/AckManager.java | 39 ++++-
.../core/protocol/mqtt/MQTTSessionCallback.java | 8 +
.../core/protocol/openwire/amq/AMQSession.java | 7 +
.../artemis/core/protocol/stomp/StompSession.java | 7 +
.../artemis/core/config/Configuration.java | 8 +
.../core/config/impl/ConfigurationImpl.java | 13 ++
.../deployers/impl/FileConfigurationParser.java | 3 +
.../impl/ManagementRemotingConnection.java | 5 +
.../protocol/core/impl/CoreSessionCallback.java | 8 +
.../activemq/artemis/core/server/Consumer.java | 7 +
.../apache/activemq/artemis/core/server/Queue.java | 3 +
.../artemis/core/server/impl/QueueImpl.java | 5 +
.../core/server/impl/ServerConsumerImpl.java | 5 +
.../artemis/spi/core/protocol/SessionCallback.java | 2 +
.../resources/schema/artemis-configuration.xsd | 8 +
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../ConfigurationTest-xinclude-config.xml | 1 +
.../ConfigurationTest-xinclude-schema-config.xml | 1 +
.../unit/core/server/impl/fakes/FakeConsumer.java | 1 -
.../amqp/connect/DisconnectConsumerMirrorTest.java | 163 +++++++++++++++++++++
.../tests/integration/cli/DummyServerConsumer.java | 1 -
.../tests/integration/client/HangConsumerTest.java | 5 +
.../mirror/LargeAccumulationTest.java | 116 +++++++--------
28 files changed, 365 insertions(+), 75 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index c7f7af4ec5..353ad90d9f 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -720,6 +720,8 @@ public final class ActiveMQDefaultConfiguration {
private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED =
false;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
+ private static boolean DEFAULT_MIRROR_DISCONNECT_CONSUMERS = false;
+
private static final boolean DEFAULT_PURGE_PAGE_FOLDERS = false;
private static final int DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS = 30;
@@ -2012,6 +2014,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED;
}
+ public static boolean getMirrorAckManagerDisconnectConsumers() {
+ return DEFAULT_MIRROR_DISCONNECT_CONSUMERS;
+ }
+
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 988ebdd5dc..d2f87961a0 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -789,6 +789,11 @@ public class AMQPSessionCallback implements
SessionCallback {
});
}
+ @Override
+ public void failConnection(String errorMessage) {
+ serverSession.getRemotingConnection().fail(new
ActiveMQException(errorMessage));
+ }
+
@Override
public boolean hasCredits(ServerConsumer consumer) {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext)
consumer.getProtocolContext();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index fae26f0293..d7192d86ef 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -298,7 +298,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
String remoteID = getRemoteMirrorId();
if (remoteID == null) {
- // This is to avoid a reflection (Miror sendin messages back to
itself) from a small period of time one node reconnects but not the opposite
direction.
+ // This is to avoid a reflection (Mirror sending messages back to
itself) from a small period of time one node reconnects but not the opposite
direction.
Object localRemoteID = message.getAnnotation(BROKER_ID_SIMPLE_STRING);
if (localRemoteID != null) {
remoteID = String.valueOf(localRemoteID);
@@ -341,20 +341,22 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
SimpleString address = context.getAddress(message);
if (context.isInternal()) {
- logger.trace("sendMessage::server {} is discarding send to avoid
sending to internal queue", server);
+ logger.trace("sendMessage::server {} is discarding send {} to avoid
sending to internal queue", server, message);
return;
}
if (invalidTarget(context.getMirrorSource(), message)) {
- logger.trace("sendMessage::server {} is discarding send to avoid
infinite loop (reflection with the mirror)", server);
+ logger.trace("sendMessage::server {} is discarding send {} to avoid
infinite loop (reflection with the mirror)", server, message);
return;
}
if (ignoreAddress(address)) {
- logger.trace("sendMessage::server {} is discarding send to address
{}, address doesn't match filter", server, address);
+ logger.trace("sendMessage::server {} is discarding send {} to address
{}, address doesn't match filter", server, address, message);
return;
}
+ logger.trace("sendMessage::server {} is SENDING {}", server, message);
+
try {
context.setReusable(false);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index e1e3020c55..485bb298c5 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -517,6 +517,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
if (internalMirrorID == null) {
internalMirrorID = getRemoteMirrorId(); // not passing the ID means
the data was generated on the remote broker
}
+
Long internalIDLong = (Long)
deliveryAnnotations.getValue().get(INTERNAL_ID);
String internalAddress = (String)
deliveryAnnotations.getValue().get(INTERNAL_DESTINATION);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index a7f1b8654f..747bc812dc 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -47,9 +47,12 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -255,7 +258,7 @@ public class AckManager implements ActiveMQComponent {
// to be used with the same executor as the PagingStore executor
public void retryAddress(SimpleString address,
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
-
+ checkConsumers(address);
// This is an optimization:
// we peek at how many records we currently have. When we scan all the
records that were initially input we would
@@ -379,7 +382,7 @@ public class AckManager implements ActiveMQComponent {
}
}
} else {
- logger.trace("Retry {} queue attempted {} times on paging,
QueueAttempts {} Configuration Page Attempts={}", retry,
retry.getQueueAttempts(), retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
+ logger.trace("Retry {} attempted {} times on paging, Configuration
Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
}
}
@@ -512,6 +515,36 @@ public class AckManager implements ActiveMQComponent {
}
}
+ private void checkConsumers(SimpleString address) {
+ if (configuration.isMirrorDisconnectConsumers()) {
+ try {
+ Bindings bindings =
server.getPostOffice().getBindingsForAddress(address);
+ bindings.forEach((n, b) -> {
+ if (b instanceof LocalQueueBinding) {
+ Queue queue = ((LocalQueueBinding) b).getQueue();
+ checkConsumers(queue);
+ }
+ });
+ } catch (Exception e) {
+ // nothing that we can do here other than log
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void checkConsumers(Queue queue) {
+ if (configuration.isMirrorDisconnectConsumers() &&
queue.getConsumerCount() > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Disconnecting consumers on queue {}",
queue.getName());
+ }
+ queue.forEachConsumer(this::failConsumer);
+ }
+ }
+
+ private void failConsumer(Consumer consumer) {
+ consumer.failConnection("Mirror requesting consumers away to perform
proper ACK retries");
+ }
+
public boolean ack(String nodeID, Queue targetQueue, long messageID,
AckReason reason, boolean allowRetry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={},
allowRetry={})", nodeID, messageID, targetQueue.getName(), allowRetry);
@@ -525,6 +558,8 @@ public class AckManager implements ActiveMQComponent {
}
if (allowRetry) {
+ checkConsumers(targetQueue);
+
if (configuration != null &&
configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount()
> 0) {
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(),
targetQueue.getName(), nodeID, messageID);
} else {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 9b80d5b66f..c872e95ab0 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -44,6 +45,13 @@ public class MQTTSessionCallback implements SessionCallback {
return connection.isWritable(callback);
}
+
+ @Override
+ public void failConnection(String errorMessage) {
+ connection.fail(new ActiveMQException(errorMessage));
+ }
+
+
@Override
public int sendMessage(MessageReference ref,
ServerConsumer consumer,
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4b35b2e656..d44748b73f 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -283,6 +284,12 @@ public class AMQSession implements SessionCallback {
return connection.isWritable(callback);
}
+ @Override
+ public void failConnection(String errorMessage) {
+ connection.fail(new ActiveMQException(errorMessage));
+ }
+
+
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
// TODO Auto-generated method stub
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index dd85ac6bd2..bac854868b 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@@ -118,6 +119,12 @@ public class StompSession implements SessionCallback {
return connection.isWritable(callback);
}
+ @Override
+ public void failConnection(String errorMessage) {
+ connection.fail(new ActiveMQException(errorMessage));
+ }
+
+
void setServerSession(ServerSession session) {
this.session = session;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 1effd74b11..eb1b2c0a4c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1526,6 +1526,14 @@ public interface Configuration {
boolean isMirrorAckManagerWarnUnacked();
+ /**
+ * Should Mirror disconnect consumers in order to clearn Acknowledgement
retries.
+ * This is useful in situations where you want consumers connected to only
one side of the Mirrors.
+ * */
+ Configuration setMirrorDisconnectConsumers(boolean disconnectConsumers);
+
+ boolean isMirrorDisconnectConsumers();
+
/**
* Should the system remove page folders once destinations stop paging.
* Default is false, however future major versions will have this as true
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 6c5738a13b..72d2a2ed53 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -466,6 +466,8 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private boolean mirrorAckManagerWarnUnacked =
ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked();
+ private boolean mirrorDisconnectConsumers =
ActiveMQDefaultConfiguration.getMirrorAckManagerDisconnectConsumers();
+
private int mirrorAckManagerRetryDelay =
ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction =
ActiveMQDefaultConfiguration.getMirrorPageTransaction();
@@ -3392,6 +3394,17 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return this;
}
+ @Override
+ public ConfigurationImpl setMirrorDisconnectConsumers(boolean
disconnectConsumers) {
+ this.mirrorDisconnectConsumers = disconnectConsumers;
+ return this;
+ }
+
+ @Override
+ public boolean isMirrorDisconnectConsumers() {
+ return mirrorDisconnectConsumers;
+ }
+
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int
minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}",
minQueueAttempts);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8f100cf59a..dd250dc181 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -395,6 +395,7 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String MIRROR_ACK_MANAGER_RETRY_DELAY =
"mirror-ack-manager-retry-delay";
private static final String MIRROR_ACK_MANAGER_WARN_UNACKED =
"mirror-ack-manager-warn-unacked";
+ private static final String MIRROR_DISCONNECT_CONSUMERS =
"mirror-disconnect-consumers";
private static final String MIRROR_PAGE_TRANSACTION =
"mirror-page-transaction";
@@ -889,6 +890,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setMirrorAckManagerWarnUnacked(getBoolean(e,
MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked()));
+ config.setMirrorDisconnectConsumers(getBoolean(e,
MIRROR_DISCONNECT_CONSUMERS, config.isMirrorDisconnectConsumers()));
+
parseAddressSettings(e, config);
parseResourceLimits(e, config);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index b3487a54d0..c35187ccd1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -260,6 +260,11 @@ public class ManagementRemotingConnection implements
RemotingConnection {
public void disconnect(ServerConsumer consumerId, String message) {
}
+ @Override
+ public void failConnection(String errorMessage) {
+
+ }
+
@Override
public boolean isWritable(ReadyListener callback, Object
protocolContext) {
return false;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 34eac07137..6cd9deb80d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -80,6 +81,13 @@ public final class CoreSessionCallback implements
SessionCallback {
return connection.isWritable(callback);
}
+
+ @Override
+ public void failConnection(String errorMessage) {
+ connection.fail(new ActiveMQException(errorMessage));
+ }
+
+
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer,
MessageReference ref, boolean failed) {
return false;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 79b97b0834..0666a494ec 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -84,6 +84,13 @@ public interface Consumer extends PriorityAware {
*/
void disconnect();
+ /**
+ * disconnect the consumer
+ */
+ default void failConnection(String errorMessage) {
+ }
+
+
void failed(Throwable t);
/**
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 061a32927b..0da22f18b3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -167,6 +167,9 @@ public interface Queue extends Bindable, CriticalComponent {
void addConsumer(Consumer consumer) throws Exception;
+ default void forEachConsumer(java.util.function.Consumer<Consumer>
callback) {
+ }
+
void addLingerSession(String sessionId);
void removeLingerSession(String sessionId);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index aefcba2564..6f88e57430 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1228,6 +1228,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
}
+ @Override
+ public void forEachConsumer(java.util.function.Consumer<Consumer> callback)
{
+ consumers.stream().forEach(t -> callback.accept(t.consumer));
+ }
+
@Override
public void addLingerSession(String sessionId) {
lingerSessionIds.add(sessionId);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 167d9986aa..8a4d56987b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1166,6 +1166,11 @@ public class ServerConsumerImpl implements
ServerConsumer, ReadyListener {
callback.disconnect(this, "Queue deleted: " + getQueue().getName());
}
+ @Override
+ public void failConnection(String errorMessage) {
+ callback.failConnection(errorMessage);
+ }
+
@Override
public void failed(Throwable t) {
try {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ad504739cf..75b81a8645 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -79,6 +79,8 @@ public interface SessionCallback {
boolean isWritable(ReadyListener callback, Object protocolContext);
+ void failConnection(String errorMessage);
+
/**
* Some protocols (Openwire) needs a special message with the browser is
finished.
*/
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5e87d2e00f..2e9baeb018 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -998,6 +998,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="mirror-disconnect-consumers" type="xsd:boolean"
maxOccurs="1" minOccurs="0" default="false">
+ <xsd:annotation>
+ <xsd:documentation>
+ Should mirror disconnect consumers if needed to proceed with
acks to minimize chances of a missing ack.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="suppress-session-notifications" type="xsd:boolean"
default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index b59bc1ccbb..9823853988 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -597,6 +597,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals(111, configInstance.getMirrorAckManagerQueueAttempts());
assertTrue(configInstance.isMirrorAckManagerWarnUnacked());
+ assertTrue(configInstance.isMirrorDisconnectConsumers());
assertEquals(222, configInstance.getMirrorAckManagerPageAttempts());
assertEquals(333, configInstance.getMirrorAckManagerRetryDelay());
assertTrue(configInstance.isMirrorPageTransaction());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index b9935e85bb..45e9c42401 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -601,6 +601,7 @@
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
+ <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
<security-settings>
<security-setting match="a1">
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 803eed9747..80436e0dc2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -79,6 +79,7 @@
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
+ <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index eec3aae8e7..ad1bda4628 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -79,6 +79,7 @@
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
+ <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
<xi:include
href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-incoming-interceptors.xml"/>
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 022293328a..1a6954e4a3 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -154,7 +154,6 @@ public class FakeConsumer implements Consumer {
@Override
public void disconnect() {
- //To change body of implemented methods use File | Settings | File
Templates.
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
new file mode 100644
index 0000000000..e968de79f0
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class DisconnectConsumerMirrorTest extends ActiveMQTestBase {
+
+ private static final int NUMBER_OF_MESSAGES = 5;
+
+ ActiveMQServer server1;
+ ActiveMQServer server2;
+
+ @Test
+ public void testDisconnectConsumers() throws Exception {
+
+ try {
+ String queueName = getName();
+
+ {
+ Configuration configuration = createDefaultConfig(0, false);
+ configuration.setMirrorDisconnectConsumers(true);
+ configuration.getAddressConfigurations().clear();
+ configuration.setResolveProtocols(true);
+
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
+
configuration.addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ configuration.addAcceptorConfiguration("clients",
"tcp://localhost:61616");
+ AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("toDC2",
"tcp://localhost:61617").setRetryInterval(100).setReconnectAttempts(-1);
+ AMQPMirrorBrokerConnectionElement mirror = new
AMQPMirrorBrokerConnectionElement().setDurable(true);
+ brokerConnectConfiguration.addMirror(mirror);
+ configuration.addAMQPConnection(brokerConnectConfiguration);
+ server1 = createServer(true, configuration);
+ server1.setIdentity("server1");
+ server1.start();
+ }
+
+ {
+ Configuration configuration = createDefaultConfig(1, false);
+ configuration.setMirrorDisconnectConsumers(true);
+ configuration.setResolveProtocols(true);
+
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
+
configuration.addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ configuration.addAcceptorConfiguration("clients",
"tcp://localhost:61617");
+ AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("toDC1",
"tcp://localhost:61616").setRetryInterval(100).setReconnectAttempts(-1);
+ AMQPMirrorBrokerConnectionElement mirror = new
AMQPMirrorBrokerConnectionElement();
+ brokerConnectConfiguration.addMirror(mirror);
+ configuration.addAMQPConnection(brokerConnectConfiguration);
+ server2 = createServer(true, configuration);
+ server2.setIdentity("server2");
+ server2.start();
+ }
+
+ validateProtocol("AMQP", queueName);
+ validateProtocol("CORE", queueName);
+ validateProtocol("OPENWIRE", queueName);
+ } finally {
+ server1.stop();
+ server2.stop();
+
+ server1 = null;
+ server2 = null;
+ }
+ }
+
+ private void validateProtocol(String protocol, String queueName) throws
Exception {
+ Queue mirrorQueue2 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC1");
+ assertNotNull(mirrorQueue2);
+
+ Queue mirrorQueue1 =
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC2");
+ assertNotNull(mirrorQueue1);
+
+ ConnectionFactory factory1 = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+ ConnectionFactory factory2 = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61617");
+
+ try (Connection connection = factory2.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ }
+ session.commit();
+ }
+
+ try (Connection connection1 = factory1.createConnection(); Connection
connection2 = factory2.createConnection()) {
+
+ connection1.start();
+ connection2.start();
+
+ Session session1 = connection1.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer_server1 =
session1.createConsumer(session1.createQueue(queueName));
+ assertNotNull(consumer_server1.receive(5000));
+
+ Session session2 = connection2.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer_server2 =
session2.createConsumer(session1.createQueue(queueName));
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ assertNotNull(consumer_server2.receive(5000));
+ }
+ session2.commit();
+
+ Wait.assertEquals(0, mirrorQueue1::getMessageCount);
+ Wait.assertEquals(0, mirrorQueue2::getMessageCount);
+
+ verifyNoMessages(server1, server2, queueName);
+
+ // Consumers on server1 were supposed to be disconnected
+ // as instructed on MirrorDisconnectConsumers
+ Assertions.assertThrows(JMSException.class, () -> {
+ consumer_server1.receive(5000);
+ });
+ }
+ }
+
+ private void verifyNoMessages(ActiveMQServer server1,
+ ActiveMQServer server2,
+ String queueName) throws Exception {
+ Queue queueServer1 = server1.locateQueue(queueName);
+ Queue queueServer2 = server2.locateQueue(queueName);
+
+ Queue mirrorQueue1 =
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC2");
+ Queue mirrorQueue2 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC1");
+
+ Wait.assertEquals(0L, mirrorQueue1::getMessageCount, 5000, 100);
+ Wait.assertEquals(0L, mirrorQueue2::getMessageCount, 5000, 100);
+
+ Wait.assertEquals(0L, queueServer1::getMessageCount, 5000, 100);
+ Wait.assertEquals(0L, queueServer2::getMessageCount, 5000, 100);
+ }
+}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 6483f07ce9..b27519feb6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -249,7 +249,6 @@ public class DummyServerConsumer implements ServerConsumer {
@Override
public void disconnect() {
-
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 46901169b7..0c539622a3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -519,6 +519,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
return targetCallback.sendLargeMessageContinuation(consumer, body,
continues, requiresResponse);
}
+ @Override
+ public void failConnection(String errorMessage) {
+ targetCallback.failConnection(errorMessage);
+ }
+
@Override
public void closed() {
targetCallback.closed();
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
index 612460ab59..b46dbee66a 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
@@ -20,6 +20,7 @@ package
org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -46,6 +47,7 @@ import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.TestParameters;
import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -123,6 +125,7 @@ public class LargeAccumulationTest extends SoakTestBase {
brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
+ brokerProperties.put("mirrorDisconnectConsumers", "true");
brokerProperties.put("pageSyncTimeout", "" +
TimeUnit.MILLISECONDS.toNanos(1));
brokerProperties.put("messageExpiryScanPeriod", "-1");
@@ -154,7 +157,6 @@ public class LargeAccumulationTest extends SoakTestBase {
+ "logger.db1.level=DEBUG"));
}
-
@BeforeAll
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
@@ -262,9 +264,6 @@ public class LargeAccumulationTest extends SoakTestBase {
@Test
public void testLargeAccumulation() throws Exception {
- final boolean useTopic = true;
- final boolean useQueue = true;
-
AtomicInteger errors = new AtomicInteger(0);
// producers will have 2 sets of producers (queue and topic)
@@ -297,70 +296,57 @@ public class LargeAccumulationTest extends SoakTestBase {
}
}
+ Connection connectionOnServer2 = cfs[1].createConnection();
+ runAfter(connectionOnServer2::close);
+
+ Session sessionOnSrv2 = connectionOnServer2.createSession(true,
Session.SESSION_TRANSACTED);
+
+ MessageConsumer deadConsumer =
sessionOnSrv2.createConsumer(sessionOnSrv2.createQueue(QUEUE_NAME));
+
CountDownLatch doneTopic = null, doneQueue = null;
- if (useTopic) {
- doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic,
"LargeMessageTopic");
- }
- if (useQueue) {
- doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue,
"LargeMessageQueue");
- }
+ doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic,
"LargeMessageTopic");
+ doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue,
"LargeMessageQueue");
- if (useTopic) {
- assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- }
- if (useQueue) {
- assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- }
+ assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
assertEquals(0, errors.get());
- if (useTopic) {
- doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic,
"MediumMessageTopic");
- }
- if (useQueue) {
- doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue,
"MediumMessageQueue");
- }
+ doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic,
"MediumMessageTopic");
+ doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue,
"MediumMessageQueue");
- if (useTopic) {
- assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- }
- if (useQueue) {
- assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- }
+ assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
assertEquals(0, errors.get());
- matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES +
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, useTopic, useQueue, true);
+ matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES +
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, true);
- if (useQueue) {
- doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
- consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue,
doneQueue);
- }
- if (useTopic) {
- doneTopic = new CountDownLatch(NUMBER_OF_THREADS *
NUMBER_OF_SUBSCRIPTIONS);
- for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
- consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i,
largeTopic, doneTopic);
- }
+ doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
+ consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue,
doneQueue);
+ doneTopic = new CountDownLatch(NUMBER_OF_THREADS *
NUMBER_OF_SUBSCRIPTIONS);
+ for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+ consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i,
largeTopic, doneTopic);
}
- if (useTopic) {
- assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- }
+ assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- if (useQueue) {
- assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- }
+ assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
assertEquals(0, errors.get());
- matchMessageCounts(sm, 0, useTopic, useQueue, true);
+ matchMessageCounts(sm, 0, true);
+
+ // deadConsumer was supposed to be disconnected from the ack manager
+ Assertions.assertThrows(JMSException.class, () -> {
+ deadConsumer.receive(5000);
+ });
+
+ connectionOnServer2.close();
+
}
- private boolean matchMessageCounts(SimpleManagement[] sm,
- long numberOfMessages,
- boolean useTopic,
- boolean useQueue,
- boolean useWait) throws Exception {
+ private boolean matchMessageCounts(SimpleManagement[] sm, long
numberOfMessages, boolean useWait) throws Exception {
for (SimpleManagement s : sm) {
logger.debug("Checking counts on SNF for {}", s.getUri());
if (useWait) {
@@ -371,29 +357,25 @@ public class LargeAccumulationTest extends SoakTestBase {
}
}
- if (useTopic) {
- for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
- String subscriptionName = "sub_" + i + ":global";
- logger.debug("Checking counts on {} on {}", subscriptionName,
s.getUri());
- if (useWait) {
- Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(subscriptionName),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
- } else {
- if (s.getMessageCountOnQueue(subscriptionName) !=
numberOfMessages) {
- return false;
- }
- }
- }
- }
-
- if (useQueue) {
+ for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+ String subscriptionName = "sub_" + i + ":global";
+ logger.debug("Checking counts on {} on {}", subscriptionName,
s.getUri());
if (useWait) {
- Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(QUEUE_NAME),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+ Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(subscriptionName),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
} else {
- if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
+ if (s.getMessageCountOnQueue(subscriptionName) !=
numberOfMessages) {
return false;
}
}
}
+
+ if (useWait) {
+ Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(QUEUE_NAME),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+ } else {
+ if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
+ return false;
+ }
+ }
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact