This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce3429ce878 [fix][test] Fix flaky
OneWayReplicatorUsingGlobalZKTest.cleanup (#25389)
ce3429ce878 is described below
commit ce3429ce878fd2003793b030b3f317deb49a34db
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Mar 23 17:10:08 2026 +0200
[fix][test] Fix flaky OneWayReplicatorUsingGlobalZKTest.cleanup (#25389)
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../broker/service/OneWayReplicatorTestBase.java | 27 +++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 1c43b27e1ec..a5983cecf9c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -303,8 +303,12 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
// delete namespaces.
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace,
Sets.newHashSet(cluster1), true);
+ admin1.namespaces().setNamespaceReplicationClusters(
+ sourceClusterAlwaysSchemaCompatibleNamespace,
Sets.newHashSet(cluster1), true);
if (!usingGlobalZK) {
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace,
Sets.newHashSet(cluster2), true);
+ admin2.namespaces().setNamespaceReplicationClusters(
+ sourceClusterAlwaysSchemaCompatibleNamespace,
Sets.newHashSet(cluster2), true);
}
// When using global ZK, reducing replication clusters triggers async
topic cleanup on removed clusters.
// Retry namespace deletion to handle topics that may be in a
transitional state.
@@ -314,16 +318,33 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(()
-> {
admin1.namespaces().deleteNamespace(nonReplicatedNamespace, true);
});
+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(()
-> {
+
admin1.namespaces().deleteNamespace(sourceClusterAlwaysSchemaCompatibleNamespace,
true);
+ });
if (!usingGlobalZK) {
- admin2.namespaces().deleteNamespace(replicatedNamespace, true);
- admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(()
-> {
+ admin2.namespaces().deleteNamespace(replicatedNamespace, true);
+ });
+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(()
-> {
+ admin2.namespaces().deleteNamespace(nonReplicatedNamespace,
true);
+ });
+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(()
-> {
+
admin2.namespaces().deleteNamespace(sourceClusterAlwaysSchemaCompatibleNamespace,
true);
+ });
}
}
@Override
protected void cleanup() throws Exception {
// cleanup pulsar resources.
- cleanupPulsarResources();
+ // Wrap in try-catch to ensure brokers, ZK, and BK are always shut
down even if
+ // namespace deletion fails (e.g., topics in transitional state during
async replication cleanup).
+ try {
+ cleanupPulsarResources();
+ } catch (Exception e) {
+ log.warn("Failed to cleanup Pulsar resources during shutdown, "
+ + "continuing with broker/ZK/BK shutdown", e);
+ }
// shutdown.
markCurrentSetupNumberCleaned();