This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1064f5e1929 [fix][test] Make
ClusterMigrationTest.testClusterMigrationWithReplicationBacklog more robust
(#25330)
1064f5e1929 is described below
commit 1064f5e192959b26ebf6076e5b6836e18f0d52e7
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:51:12 2026 -0700
[fix][test] Make
ClusterMigrationTest.testClusterMigrationWithReplicationBacklog more robust
(#25330)
---
.../broker/service/ClusterMigrationTest.java | 65 +++++++++++-----------
1 file changed, 32 insertions(+), 33 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index c5103bff5d5..6a9a6a84209 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -451,28 +452,29 @@ public class ClusterMigrationTest {
Consumer<byte[]> consumer3 =
client3.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic)
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
- retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
- retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5,
500);
- assertFalse(topic1.getProducers().isEmpty());
- assertFalse(topic1.getSubscriptions().isEmpty());
+
+ Awaitility.await()
+ .untilAsserted(() ->
assertFalse(topic1.getProducers().isEmpty()));
+ Awaitility.await()
+ .untilAsserted(() ->
assertFalse(topic1.getSubscriptions().isEmpty()));
// build backlog
consumer1.close();
- retryStrategically((test) -> topic1.getReplicators().size() == 1, 10,
3000);
- assertEquals(topic1.getReplicators().size(), 1);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertEquals(topic1.getReplicators().size(), 1));
- // stop service in the replication cluster to build replication backlog
+ // stop service in the replication cluster to build replication backlog
broker3.stop();
- retryStrategically((test) -> broker3.getPulsarService() == null, 10,
1000);
- assertNull(pulsar3.getBrokerService());
+ Awaitility.await()
+ .untilAsserted(() -> assertNull(broker3.getPulsarService()));
- //publish messages into topic in "r1" cluster
+ // publish messages into topic in "r1" cluster
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}
- retryStrategically((test) -> topic1.isReplicationBacklogExist(), 10,
1000);
- assertTrue(topic1.isReplicationBacklogExist());
+ Awaitility.await()
+ .untilAsserted(() ->
assertTrue(topic1.isReplicationBacklogExist()));
@Cleanup
PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0,
TimeUnit.SECONDS)
@@ -484,7 +486,8 @@ public class ClusterMigrationTest {
log.info("name of topic 2 - {}", topic2.getName());
assertFalse(topic2.getProducers().isEmpty());
- retryStrategically((test) -> topic2.getReplicators().size() == 1, 10,
2000);
+ Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertEquals(topic2.getReplicators().size(), 1));
log.info("replicators should be ready");
ClusterUrl migratedUrl = new
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
@@ -492,23 +495,15 @@ public class ClusterMigrationTest {
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
log.info("update cluster migration called");
- retryStrategically((test) -> {
- try {
- topic1.checkClusterMigration().get();
- return true;
- } catch (Exception e) {
- // ok
- }
- return false;
- }, 10, 500);
-
- topic1.checkClusterMigration().get();
+ Awaitility.await().untilAsserted(() -> {
+ topic1.checkClusterMigration().get();
+ });
producer1.sendAsync("test1".getBytes());
// producer is disconnected from cluster-1
- retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
- assertTrue(topic1.getProducers().isEmpty());
+ Awaitility.await()
+ .untilAsserted(() ->
assertTrue(topic1.getProducers().isEmpty()));
// verify that the disconnected producer is not redirected
// to replication cluster since there is replication backlog.
@@ -516,20 +511,24 @@ public class ClusterMigrationTest {
// Restart the service in cluster "r3".
broker3.restart();
- retryStrategically((test) -> broker3.getPulsarService() != null, 10,
1000);
- assertNotNull(broker3.getPulsarService());
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertNotNull(broker3.getPulsarService()));
pulsar3 = broker3.getPulsarService();
// verify that the replication backlog drains once service in cluster
"r3" is restarted.
- retryStrategically((test) -> !topic1.isReplicationBacklogExist(), 10,
1000);
- assertFalse(topic1.isReplicationBacklogExist());
+ // The replicator from r1 needs to reconnect to the restarted broker3,
which may take time
+ // as the new broker's load manager channel needs to be fully
initialized.
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertFalse(topic1.isReplicationBacklogExist()));
// verify that the producer1 is now connected to migrated cluster "r2"
since backlog is cleared.
topic1.checkClusterMigration().get();
- // verify that the producer1 is now is now connected to migrated
cluster "r2" since backlog is cleared.
- retryStrategically((test) -> topic2.getProducers().size() == 2, 10,
500);
- assertEquals(topic2.getProducers().size(), 2);
+ // verify that the producer1 is now connected to migrated cluster "r2"
since backlog is cleared.
+ // The producer reconnection uses exponential backoff after
TopicMigratedException, so
+ // it may take a while for the producer to reconnect to the migrated
cluster.
+ Awaitility.await().atMost(90, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertEquals(topic2.getProducers().size(), 2));
client1.close();
client2.close();