This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3982de020ac [fix][test] Fix flaky
IsolatedBookieEnsemblePlacementPolicyTest.testMetadataStoreCases (#25474)
3982de020ac is described below
commit 3982de020ac3b0e765fb3ea475dd46448e6c5144
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 6 19:55:13 2026 -0700
[fix][test] Fix flaky
IsolatedBookieEnsemblePlacementPolicyTest.testMetadataStoreCases (#25474)
---
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 56 ++++++++--------------
1 file changed, 21 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index d3fae728483..91c56988c10 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -149,39 +149,23 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
rackConfiguration1.put("group2", secondaryBookieGroup);
initialFuture.complete(Optional.of(rackConfiguration1));
- long waitTime = 2000;
+ //The waitingCompleteFuture has group1 and group2.
+ BookiesRackConfiguration rackConfiguration2 = new
BookiesRackConfiguration();
+ Map<String, BookieInfo> mainBookieGroup2 = new HashMap<>();
+ mainBookieGroup2.put(BOOKIE1,
BookieInfo.builder().rack("rack0").build());
+ mainBookieGroup2.put(BOOKIE2,
BookieInfo.builder().rack("rack1").build());
+ mainBookieGroup2.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
+
+ Map<String, BookieInfo> secondaryBookieGroup2 = new HashMap<>();
+ secondaryBookieGroup2.put(BOOKIE3,
BookieInfo.builder().rack("rack0").build());
+ rackConfiguration2.put("group1", mainBookieGroup2);
+ rackConfiguration2.put("group2", secondaryBookieGroup2);
+
+ // Use manually-controlled futures instead of Thread.sleep-based timing
+ // to avoid races where the thread completes earlier than expected.
CompletableFuture<Optional<BookiesRackConfiguration>>
waitingCompleteFuture = new CompletableFuture<>();
- new Thread(() -> {
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- //The waitingCompleteFuture has group1 and group2.
- BookiesRackConfiguration rackConfiguration2 = new
BookiesRackConfiguration();
- Map<String, BookieInfo> mainBookieGroup2 = new HashMap<>();
- mainBookieGroup2.put(BOOKIE1,
BookieInfo.builder().rack("rack0").build());
- mainBookieGroup2.put(BOOKIE2,
BookieInfo.builder().rack("rack1").build());
- mainBookieGroup2.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
-
- Map<String, BookieInfo> secondaryBookieGroup2 = new HashMap<>();
- secondaryBookieGroup2.put(BOOKIE3,
BookieInfo.builder().rack("rack0").build());
- rackConfiguration2.put("group1", mainBookieGroup2);
- rackConfiguration2.put("group2", secondaryBookieGroup2);
- waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
- }).start();
-
- long longWaitTime = 4000;
+ //The emptyFuture means that the zk node /bookies already be removed.
CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture =
new CompletableFuture<>();
- new Thread(() -> {
- try {
- Thread.sleep(longWaitTime);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- //The emptyFuture means that the zk node /bookies already be
removed.
- emptyFuture.complete(Optional.empty());
- }).start();
//Return different future means that cache expire.
when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
@@ -206,12 +190,13 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
assertTrue(blacklist.isEmpty());
- //waitingCompleteFuture, the future is waiting done.
+ //waitingCompleteFuture, the future is not yet done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
assertTrue(blacklist.isEmpty());
- Thread.sleep(waitTime);
+ // Complete the future manually (simulates cache expiry and refresh)
+ waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
//waitingCompleteFuture, the future is already done.
blacklist =
@@ -221,7 +206,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
BookieId excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);
- //emptyFuture, the future is waiting done.
+ //emptyFuture, the future is not yet done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
assertFalse(blacklist.isEmpty());
@@ -229,7 +214,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);
- Thread.sleep(longWaitTime - waitTime);
+ // Complete the empty future manually (simulates zk node removal)
+ emptyFuture.complete(Optional.empty());
//emptyFuture, the future is already done.
blacklist =