This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 78e21a335a384b6dd0582bd6efb3b05610157c32 Author: Lari Hotari <[email protected]> AuthorDate: Mon Mar 23 17:10:08 2026 +0200 [fix][test] Fix flaky OneWayReplicatorUsingGlobalZKTest.cleanup (#25389) Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]> (cherry picked from commit ce3429ce878fd2003793b030b3f317deb49a34db) --- .../broker/service/OneWayReplicatorTestBase.java | 27 +++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index a2cf9d61499..0b033ec269c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -303,8 +303,12 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { // delete namespaces. waitChangeEventsInit(replicatedNamespace); admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1)); + admin1.namespaces().setNamespaceReplicationClusters( + sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster1)); if (!usingGlobalZK) { admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2)); + admin2.namespaces().setNamespaceReplicationClusters( + sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster2)); } // When using global ZK, reducing replication clusters triggers async topic cleanup on removed clusters. // Retry namespace deletion to handle topics that may be in a transitional state. @@ -314,16 +318,33 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> { admin1.namespaces().deleteNamespace(nonReplicatedNamespace, true); }); + Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> { + admin1.namespaces().deleteNamespace(sourceClusterAlwaysSchemaCompatibleNamespace, true); + }); if (!usingGlobalZK) { - admin2.namespaces().deleteNamespace(replicatedNamespace, true); - admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true); + Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> { + admin2.namespaces().deleteNamespace(replicatedNamespace, true); + }); + Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> { + admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true); + }); + Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> { + admin2.namespaces().deleteNamespace(sourceClusterAlwaysSchemaCompatibleNamespace, true); + }); } } @Override protected void cleanup() throws Exception { // cleanup pulsar resources. - cleanupPulsarResources(); + // Wrap in try-catch to ensure brokers, ZK, and BK are always shut down even if + // namespace deletion fails (e.g., topics in transitional state during async replication cleanup). + try { + cleanupPulsarResources(); + } catch (Exception e) { + log.warn("Failed to cleanup Pulsar resources during shutdown, " + + "continuing with broker/ZK/BK shutdown", e); + } // shutdown. markCurrentSetupNumberCleaned();
