This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1064f5e1929 [fix][test] Make 
ClusterMigrationTest.testClusterMigrationWithReplicationBacklog more robust 
(#25330)
1064f5e1929 is described below

commit 1064f5e192959b26ebf6076e5b6836e18f0d52e7
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:51:12 2026 -0700

    [fix][test] Make 
ClusterMigrationTest.testClusterMigrationWithReplicationBacklog more robust 
(#25330)
---
 .../broker/service/ClusterMigrationTest.java       | 65 +++++++++++-----------
 1 file changed, 32 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index c5103bff5d5..6a9a6a84209 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -451,28 +452,29 @@ public class ClusterMigrationTest {
         Consumer<byte[]> consumer3 = 
client3.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s1").subscribe();
         AbstractTopic topic1 = (AbstractTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
-        retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
-        retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 
500);
-        assertFalse(topic1.getProducers().isEmpty());
-        assertFalse(topic1.getSubscriptions().isEmpty());
+
+        Awaitility.await()
+                .untilAsserted(() -> 
assertFalse(topic1.getProducers().isEmpty()));
+        Awaitility.await()
+                .untilAsserted(() -> 
assertFalse(topic1.getSubscriptions().isEmpty()));
 
         // build backlog
         consumer1.close();
-        retryStrategically((test) -> topic1.getReplicators().size() == 1, 10, 
3000);
-        assertEquals(topic1.getReplicators().size(), 1);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertEquals(topic1.getReplicators().size(), 1));
 
-       // stop service in the replication cluster to build replication backlog
+        // stop service in the replication cluster to build replication backlog
         broker3.stop();
-        retryStrategically((test) -> broker3.getPulsarService() == null, 10, 
1000);
-        assertNull(pulsar3.getBrokerService());
+        Awaitility.await()
+                .untilAsserted(() -> assertNull(broker3.getPulsarService()));
 
-        //publish messages into topic in "r1" cluster
+        // publish messages into topic in "r1" cluster
         int n = 5;
         for (int i = 0; i < n; i++) {
             producer1.send("test1".getBytes());
         }
-        retryStrategically((test) -> topic1.isReplicationBacklogExist(), 10, 
1000);
-        assertTrue(topic1.isReplicationBacklogExist());
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(topic1.isReplicationBacklogExist()));
 
         @Cleanup
         PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
@@ -484,7 +486,8 @@ public class ClusterMigrationTest {
         log.info("name of topic 2 - {}", topic2.getName());
         assertFalse(topic2.getProducers().isEmpty());
 
-        retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 
2000);
+        Awaitility.await().atMost(20, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertEquals(topic2.getReplicators().size(), 1));
         log.info("replicators should be ready");
 
         ClusterUrl migratedUrl = new 
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
@@ -492,23 +495,15 @@ public class ClusterMigrationTest {
         admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
         log.info("update cluster migration called");
 
-        retryStrategically((test) -> {
-            try {
-                topic1.checkClusterMigration().get();
-                return true;
-            } catch (Exception e) {
-                // ok
-            }
-            return false;
-        }, 10, 500);
-
-        topic1.checkClusterMigration().get();
+        Awaitility.await().untilAsserted(() -> {
+            topic1.checkClusterMigration().get();
+        });
 
         producer1.sendAsync("test1".getBytes());
 
         // producer is disconnected from cluster-1
-        retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
-        assertTrue(topic1.getProducers().isEmpty());
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(topic1.getProducers().isEmpty()));
 
         // verify that the disconnected producer is not redirected
         // to replication cluster since there is replication backlog.
@@ -516,20 +511,24 @@ public class ClusterMigrationTest {
 
         // Restart the service in cluster "r3".
         broker3.restart();
-        retryStrategically((test) -> broker3.getPulsarService() != null, 10, 
1000);
-        assertNotNull(broker3.getPulsarService());
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertNotNull(broker3.getPulsarService()));
         pulsar3 = broker3.getPulsarService();
 
         // verify that the replication backlog drains once service in cluster 
"r3" is restarted.
-        retryStrategically((test) -> !topic1.isReplicationBacklogExist(), 10, 
1000);
-        assertFalse(topic1.isReplicationBacklogExist());
+        // The replicator from r1 needs to reconnect to the restarted broker3, 
which may take time
+        // as the new broker's load manager channel needs to be fully 
initialized.
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertFalse(topic1.isReplicationBacklogExist()));
 
         // verify that the producer1 is now connected to migrated cluster "r2" 
since backlog is cleared.
         topic1.checkClusterMigration().get();
 
-        // verify that the producer1 is now is now connected to migrated 
cluster "r2" since backlog is cleared.
-        retryStrategically((test) -> topic2.getProducers().size() == 2, 10, 
500);
-        assertEquals(topic2.getProducers().size(), 2);
+        // verify that the producer1 is now connected to migrated cluster "r2" 
since backlog is cleared.
+        // The producer reconnection uses exponential backoff after 
TopicMigratedException, so
+        // it may take a while for the producer to reconnect to the migrated 
cluster.
+        Awaitility.await().atMost(90, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertEquals(topic2.getProducers().size(), 2));
 
         client1.close();
         client2.close();

Reply via email to