This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 4362c5a8c1 Fix/recuring flaky tests (#1791)
4362c5a8c1 is described below

commit 4362c5a8c15af58ff2b4a73521ee35d5948d6eb1
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Mar 24 16:50:26 2026 +0100

    Fix/recuring flaky tests (#1791)
    
    * fix(test): flaky tests
    
    * fix(test): Add logs to understand why the tests continue to fail randomly
    
    * fix(test): enhance DurableFiveBrokerNetworkBridgeTest with additional 
assertions for durable subscriptions
    
    * fix(test): use reflection to syncrhonize. Unfortunatly I can't see other 
means to do that
    
    * fix(test): remove additional logging to debug tests
---
 .../apache/activemq/transport/mqtt/MQTTTest.java   | 31 ++++++++-
 .../DurableFiveBrokerNetworkBridgeTest.java        | 76 ++++++++++++++++++++++
 .../network/DurableSyncNetworkBridgeTest.java      | 24 +++++--
 .../network/VirtualConsumerDemandTest.java         |  7 +-
 4 files changed, 124 insertions(+), 14 deletions(-)

diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 27786f6b12..ab49881c99 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -48,6 +48,9 @@ import jakarta.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
 import 
org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -1736,15 +1739,37 @@ public class MQTTTest extends MQTTTestSupport {
 
     private boolean isSubscriptionActive(Topic topic, String clientId) throws 
Exception {
         if (isVirtualTopicSubscriptionStrategy()) {
-            String queueName = buildVirtualTopicQueueName(topic, clientId);
+            final String queueName = buildVirtualTopicQueueName(topic, 
clientId);
             try {
                 return getProxyToQueue(queueName).getConsumerCount() > 0;
             } catch (Exception ignore) {
                 return false;
             }
         } else {
-            return 
brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 &&
-                   
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+            final int activeSubs = 
brokerService.getAdminView().getDurableTopicSubscribers().length;
+            final int inactiveSubs = 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length;
+            final boolean jmxActive = activeSubs >= 1 && inactiveSubs == 0;
+
+            // Diagnostic: also check the actual broker-level subscription 
state
+            // to determine if the flakiness is a JMX registration issue or a 
real broker bug
+            boolean brokerLevelActive = false;
+            try {
+                final RegionBroker regionBroker = (RegionBroker) 
brokerService.getBroker().getAdaptor(RegionBroker.class);
+                final TopicRegion topicRegion = (TopicRegion) 
regionBroker.getTopicRegion();
+                final String subName = QoS.values()[topic.qos().ordinal()] + 
":" + topic.name().toString();
+                final DurableTopicSubscription sub = 
topicRegion.lookupSubscription(subName, clientId);
+                brokerLevelActive = sub != null && sub.isActive();
+            } catch (Exception e) {
+                LOG.debug("Could not check broker-level subscription state", 
e);
+            }
+
+            if (jmxActive != brokerLevelActive) {
+                LOG.warn("MQTT subscription state MISMATCH: JMX says active={} 
(active={}, inactive={}), " +
+                        "broker-level says active={} for clientId={}, 
topic={}",
+                        jmxActive, activeSubs, inactiveSubs, 
brokerLevelActive, clientId, topic.name());
+            }
+
+            return jmxActive;
         }
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 6f355ae53c..fc5c152f08 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -17,9 +17,13 @@
 package org.apache.activemq.network;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import jakarta.jms.Connection;
@@ -30,6 +34,8 @@ import junit.framework.AssertionFailedError;
 import junit.framework.Test;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
@@ -134,6 +140,23 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         startAllBrokers();
         waitForBridgeFormation();
 
+        // Wait for the async durable sync (syncDurableSubs=true) to complete 
across all bridges.
+        // After restart with persistent data, NC durable subs must 
re-establish to match Phase 1
+        // counts before we proceed with unsubscribes, otherwise the sync can 
re-create NC durable
+        // subs AFTER the unsubscribe advisory has already propagated.
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+
+        // Wait for all bridge sync executors to finish populating 
durableRemoteSubs.
+        // setupStaticDestinations() creates DemandSubscriptions with empty 
durableRemoteSubs,
+        // and the syncExecutor populates them asynchronously. Without this 
wait, unsubscribe
+        // advisories may race with the sync executor: the advisory finds 
nothing to remove
+        // (durableRemoteSubs is empty), then the sync populates it, leaving a 
stale NC durable sub.
+        waitForAllBridgeSyncCompletion();
+
         conn = brokers.get("Broker_A_A").factory.createConnection();
         conn.setClientID("clientId1");
         conn.start();
@@ -854,6 +877,59 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         broker.setDataDirectory("target" + File.separator + "test-data" + 
File.separator + "DurableFiveBrokerNetworkBridgeTest");
     }
 
+    /**
+     * Wait for all bridge sync executors (both initiator and duplex bridges) 
to complete
+     * processing their BrokerSubscriptionInfo tasks on all brokers.
+     * Uses reflection to access the private syncExecutor, following the same 
pattern
+     * as {@link DynamicNetworkTestSupport#findDuplexBridge}.
+     */
+    private void waitForAllBridgeSyncCompletion() throws Exception {
+        final Field syncExecutorField = 
DemandForwardingBridgeSupport.class.getDeclaredField("syncExecutor");
+        syncExecutorField.setAccessible(true);
+        final Field duplexBridgeField = 
TransportConnection.class.getDeclaredField("duplexBridge");
+        duplexBridgeField.setAccessible(true);
+
+        for (final BrokerItem item : brokers.values()) {
+            final BrokerService broker = item.broker;
+            // Initiator bridges (accessible via network connectors)
+            for (final NetworkConnector nc : broker.getNetworkConnectors()) {
+                for (final NetworkBridge bridge : nc.activeBridges()) {
+                    if (bridge instanceof DemandForwardingBridgeSupport) {
+                        flushSyncExecutor(syncExecutorField, 
(DemandForwardingBridgeSupport) bridge);
+                    }
+                }
+            }
+            // Duplex bridges (accessible via transport connections)
+            for (final TransportConnector tc : 
broker.getTransportConnectors()) {
+                for (final TransportConnection conn : tc.getConnections()) {
+                    if (conn.getConnectionId() != null && 
conn.getConnectionId().startsWith("networkConnector_")) {
+                        final DemandForwardingBridgeSupport duplexBridge =
+                            (DemandForwardingBridgeSupport) 
duplexBridgeField.get(conn);
+                        if (duplexBridge != null) {
+                            flushSyncExecutor(syncExecutorField, duplexBridge);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void flushSyncExecutor(final Field syncExecutorField,
+            final DemandForwardingBridgeSupport bridge) throws Exception {
+        final ExecutorService syncExecutor = (ExecutorService) 
syncExecutorField.get(bridge);
+        if (syncExecutor.isShutdown()) {
+            return;
+        }
+        final CountDownLatch latch = new CountDownLatch(1);
+        try {
+            syncExecutor.execute(latch::countDown);
+        } catch (final RejectedExecutionException e) {
+            return;
+        }
+        assertTrue("Sync executor should complete on " + bridge,
+            latch.await(30, TimeUnit.SECONDS));
+    }
+
     protected void startNetworkConnectors(NetworkConnector... connectors) 
throws Exception {
         for (final NetworkConnector connector : connectors) {
             connector.start();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index a2d26c3bbc..56cbe45af4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -226,7 +226,12 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //Test that on successful reconnection of the bridge that
         //the NC sub will be removed
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
+        // In REVERSE flow, broker2=localBroker has the bridge and broker1 
(remoteBroker)
+        // is already running, so the sync may have already cleaned up the NC 
durable sub.
+        // This "before sync" assertion is only valid in FORWARD flow.
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
         restartBroker(broker1, true);
         assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -253,7 +258,9 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //the NC sub will be removed because even though the local 
subscription exists,
         //it no longer matches the included filter
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
         restartBroker(broker1, true);
         assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -291,7 +298,9 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //the NC sub will be removed because even though the local 
subscription exists,
         //it no longer matches the included static filter
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
         restartBroker(broker1, true);
         assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -320,10 +329,13 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //Test that on successful reconnection of the bridge that
         //the NC sub will be removed for topic1 but will stay for topic2
 
-        //before sync, the old NC should exist
+        //before sync, the old NC should exist (only verifiable in FORWARD 
flow;
+        //in REVERSE, broker2=localBroker has the bridge and sync may already 
have run)
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
-        assertNCDurableSubsCount(broker2, topic2, 0);
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic2, 0);
+        }
 
         //After sync, remove old NC and create one for topic 2
         restartBroker(broker1, true);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 31aee0938d..9c8a4d474b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -1357,11 +1357,8 @@ public class VirtualConsumerDemandTest extends 
DynamicNetworkTestSupport {
         includedProducer.send(test);
         assertNotNull(bridgeConsumer.receive(5000));
 
-        assertTrue("dequeues not updated",
-                Wait.waitFor(() -> 1 == 
destinationStatistics.getDequeues().getCount()));
-
-        assertEquals("broker dest stat dispatched", 1, 
destinationStatistics.getDispatched().getCount());
-        assertEquals("broker dest stat dequeues", 1, 
destinationStatistics.getDequeues().getCount());
+        waitForDispatchFromLocalBroker(destinationStatistics, 1);
+        assertLocalBrokerStatistics(destinationStatistics, 1);
 
         assertRemoteAdvisoryCount(advisoryConsumer, 1);
         assertAdvisoryBrokerCounts(1,1,0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to