This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 4362c5a8c1 Fix/recuring flaky tests (#1791)
4362c5a8c1 is described below
commit 4362c5a8c15af58ff2b4a73521ee35d5948d6eb1
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Mar 24 16:50:26 2026 +0100
Fix/recuring flaky tests (#1791)
* fix(test): flaky tests
* fix(test): Add logs to understand why the tests continue to fail randomly
* fix(test): enhance DurableFiveBrokerNetworkBridgeTest with additional
assertions for durable subscriptions
* fix(test): use reflection to syncrhonize. Unfortunatly I can't see other
means to do that
* fix(test): remove additional logging to debug tests
---
.../apache/activemq/transport/mqtt/MQTTTest.java | 31 ++++++++-
.../DurableFiveBrokerNetworkBridgeTest.java | 76 ++++++++++++++++++++++
.../network/DurableSyncNetworkBridgeTest.java | 24 +++++--
.../network/VirtualConsumerDemandTest.java | 7 +-
4 files changed, 124 insertions(+), 14 deletions(-)
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 27786f6b12..ab49881c99 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -48,6 +48,9 @@ import jakarta.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
import
org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -1736,15 +1739,37 @@ public class MQTTTest extends MQTTTestSupport {
private boolean isSubscriptionActive(Topic topic, String clientId) throws
Exception {
if (isVirtualTopicSubscriptionStrategy()) {
- String queueName = buildVirtualTopicQueueName(topic, clientId);
+ final String queueName = buildVirtualTopicQueueName(topic,
clientId);
try {
return getProxyToQueue(queueName).getConsumerCount() > 0;
} catch (Exception ignore) {
return false;
}
} else {
- return
brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 &&
-
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+ final int activeSubs =
brokerService.getAdminView().getDurableTopicSubscribers().length;
+ final int inactiveSubs =
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length;
+ final boolean jmxActive = activeSubs >= 1 && inactiveSubs == 0;
+
+ // Diagnostic: also check the actual broker-level subscription
state
+ // to determine if the flakiness is a JMX registration issue or a
real broker bug
+ boolean brokerLevelActive = false;
+ try {
+ final RegionBroker regionBroker = (RegionBroker)
brokerService.getBroker().getAdaptor(RegionBroker.class);
+ final TopicRegion topicRegion = (TopicRegion)
regionBroker.getTopicRegion();
+ final String subName = QoS.values()[topic.qos().ordinal()] +
":" + topic.name().toString();
+ final DurableTopicSubscription sub =
topicRegion.lookupSubscription(subName, clientId);
+ brokerLevelActive = sub != null && sub.isActive();
+ } catch (Exception e) {
+ LOG.debug("Could not check broker-level subscription state",
e);
+ }
+
+ if (jmxActive != brokerLevelActive) {
+ LOG.warn("MQTT subscription state MISMATCH: JMX says active={}
(active={}, inactive={}), " +
+ "broker-level says active={} for clientId={},
topic={}",
+ jmxActive, activeSubs, inactiveSubs,
brokerLevelActive, clientId, topic.name());
+ }
+
+ return jmxActive;
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 6f355ae53c..fc5c152f08 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -17,9 +17,13 @@
package org.apache.activemq.network;
import java.io.File;
+import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
@@ -30,6 +34,8 @@ import junit.framework.AssertionFailedError;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DurableTopicSubscription;
@@ -134,6 +140,23 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
startAllBrokers();
waitForBridgeFormation();
+ // Wait for the async durable sync (syncDurableSubs=true) to complete
across all bridges.
+ // After restart with persistent data, NC durable subs must
re-establish to match Phase 1
+ // counts before we proceed with unsubscribes, otherwise the sync can
re-create NC durable
+ // subs AFTER the unsubscribe advisory has already propagated.
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+
+ // Wait for all bridge sync executors to finish populating
durableRemoteSubs.
+ // setupStaticDestinations() creates DemandSubscriptions with empty
durableRemoteSubs,
+ // and the syncExecutor populates them asynchronously. Without this
wait, unsubscribe
+ // advisories may race with the sync executor: the advisory finds
nothing to remove
+ // (durableRemoteSubs is empty), then the sync populates it, leaving a
stale NC durable sub.
+ waitForAllBridgeSyncCompletion();
+
conn = brokers.get("Broker_A_A").factory.createConnection();
conn.setClientID("clientId1");
conn.start();
@@ -854,6 +877,59 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
broker.setDataDirectory("target" + File.separator + "test-data" +
File.separator + "DurableFiveBrokerNetworkBridgeTest");
}
+ /**
+ * Wait for all bridge sync executors (both initiator and duplex bridges)
to complete
+ * processing their BrokerSubscriptionInfo tasks on all brokers.
+ * Uses reflection to access the private syncExecutor, following the same
pattern
+ * as {@link DynamicNetworkTestSupport#findDuplexBridge}.
+ */
+ private void waitForAllBridgeSyncCompletion() throws Exception {
+ final Field syncExecutorField =
DemandForwardingBridgeSupport.class.getDeclaredField("syncExecutor");
+ syncExecutorField.setAccessible(true);
+ final Field duplexBridgeField =
TransportConnection.class.getDeclaredField("duplexBridge");
+ duplexBridgeField.setAccessible(true);
+
+ for (final BrokerItem item : brokers.values()) {
+ final BrokerService broker = item.broker;
+ // Initiator bridges (accessible via network connectors)
+ for (final NetworkConnector nc : broker.getNetworkConnectors()) {
+ for (final NetworkBridge bridge : nc.activeBridges()) {
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ flushSyncExecutor(syncExecutorField,
(DemandForwardingBridgeSupport) bridge);
+ }
+ }
+ }
+ // Duplex bridges (accessible via transport connections)
+ for (final TransportConnector tc :
broker.getTransportConnectors()) {
+ for (final TransportConnection conn : tc.getConnections()) {
+ if (conn.getConnectionId() != null &&
conn.getConnectionId().startsWith("networkConnector_")) {
+ final DemandForwardingBridgeSupport duplexBridge =
+ (DemandForwardingBridgeSupport)
duplexBridgeField.get(conn);
+ if (duplexBridge != null) {
+ flushSyncExecutor(syncExecutorField, duplexBridge);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void flushSyncExecutor(final Field syncExecutorField,
+ final DemandForwardingBridgeSupport bridge) throws Exception {
+ final ExecutorService syncExecutor = (ExecutorService)
syncExecutorField.get(bridge);
+ if (syncExecutor.isShutdown()) {
+ return;
+ }
+ final CountDownLatch latch = new CountDownLatch(1);
+ try {
+ syncExecutor.execute(latch::countDown);
+ } catch (final RejectedExecutionException e) {
+ return;
+ }
+ assertTrue("Sync executor should complete on " + bridge,
+ latch.await(30, TimeUnit.SECONDS));
+ }
+
protected void startNetworkConnectors(NetworkConnector... connectors)
throws Exception {
for (final NetworkConnector connector : connectors) {
connector.start();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index a2d26c3bbc..56cbe45af4 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -226,7 +226,12 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//Test that on successful reconnection of the bridge that
//the NC sub will be removed
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
+ // In REVERSE flow, broker2=localBroker has the bridge and broker1
(remoteBroker)
+ // is already running, so the sync may have already cleaned up the NC
durable sub.
+ // This "before sync" assertion is only valid in FORWARD flow.
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ }
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
@@ -253,7 +258,9 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//the NC sub will be removed because even though the local
subscription exists,
//it no longer matches the included filter
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ }
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
@@ -291,7 +298,9 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//the NC sub will be removed because even though the local
subscription exists,
//it no longer matches the included static filter
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ }
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
@@ -320,10 +329,13 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//Test that on successful reconnection of the bridge that
//the NC sub will be removed for topic1 but will stay for topic2
- //before sync, the old NC should exist
+ //before sync, the old NC should exist (only verifiable in FORWARD
flow;
+ //in REVERSE, broker2=localBroker has the bridge and sync may already
have run)
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
- assertNCDurableSubsCount(broker2, topic2, 0);
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic2, 0);
+ }
//After sync, remove old NC and create one for topic 2
restartBroker(broker1, true);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 31aee0938d..9c8a4d474b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -1357,11 +1357,8 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
includedProducer.send(test);
assertNotNull(bridgeConsumer.receive(5000));
- assertTrue("dequeues not updated",
- Wait.waitFor(() -> 1 ==
destinationStatistics.getDequeues().getCount()));
-
- assertEquals("broker dest stat dispatched", 1,
destinationStatistics.getDispatched().getCount());
- assertEquals("broker dest stat dequeues", 1,
destinationStatistics.getDequeues().getCount());
+ waitForDispatchFromLocalBroker(destinationStatistics, 1);
+ assertLocalBrokerStatistics(destinationStatistics, 1);
assertRemoteAdvisoryCount(advisoryConsumer, 1);
assertAdvisoryBrokerCounts(1,1,0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact