This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d03ef31401 [fix][test] Fix flaky TopicListSizeResultCacheTest 
concurrent requests test (#25357)
5d03ef31401 is described below

commit 5d03ef31401112454e6ed6f6953222c5a3de7c1d
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:01:24 2026 -0700

    [fix][test] Fix flaky TopicListSizeResultCacheTest concurrent requests test 
(#25357)
---
 .../TopicListSizeResultCacheTest.java              | 33 ++++++++++++++--------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
index cea075886ad..397582d7022 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -108,7 +109,8 @@ public class TopicListSizeResultCacheTest {
         CountDownLatch startLatch = new CountDownLatch(1);
         CountDownLatch completeLatch = new 
CountDownLatch(numConcurrentRequests);
 
-        List<CompletableFuture<Long>> futures = new ArrayList<>();
+        // Use a thread-safe list since multiple threads call add() 
concurrently.
+        List<CompletableFuture<Long>> futures = new CopyOnWriteArrayList<>();
 
         // Start concurrent requests
         for (int i = 0; i < numConcurrentRequests; i++) {
@@ -128,23 +130,32 @@ public class TopicListSizeResultCacheTest {
         startLatch.countDown();
         completeLatch.await(5, TimeUnit.SECONDS);
 
-        // First request completes immediately with initial estimate
-        assertTrue(futures.get(0).isDone());
-        assertEquals(futures.get(0).get().longValue(), 10 * 1024L);
+        // Exactly one request (the first to call getSizeAsync) completes 
immediately
+        // with the initial estimate. The rest wait for updateSize(). We can't 
assume
+        // which index in the list corresponds to the "first" caller since 
threads
+        // race to add their futures to the list.
+        long doneCount = 
futures.stream().filter(CompletableFuture::isDone).count();
+        assertEquals(doneCount, 1, "Exactly one request should be done (the 
first caller)");
 
-        // Other requests should be waiting
-        for (int i = 1; i < numConcurrentRequests; i++) {
-            assertFalse(futures.get(i).isDone(), "Concurrent request " + i + " 
should be waiting");
-        }
+        long waitingCount = futures.stream().filter(f -> !f.isDone()).count();
+        assertEquals(waitingCount, numConcurrentRequests - 1, "All other 
requests should be waiting");
+
+        CompletableFuture<Long> firstFuture = futures.stream()
+                .filter(CompletableFuture::isDone).findFirst().orElseThrow();
+        assertEquals(firstFuture.get().longValue(), 10 * 1024L);
 
         // Update size to complete waiting requests
         long actualSize = 20 * 1024L;
         holder.updateSize(actualSize);
 
         // All waiting requests should now complete with the actual size
-        for (int i = 1; i < numConcurrentRequests; i++) {
-            assertTrue(futures.get(i).isDone(), "Request " + i + " should 
complete after updateSize");
-            assertEquals(futures.get(i).get().longValue(), actualSize);
+        for (CompletableFuture<Long> f : futures) {
+            assertTrue(f.isDone(), "All requests should be done after 
updateSize");
+        }
+        for (CompletableFuture<Long> f : futures) {
+            long value = f.get().longValue();
+            assertTrue(value == 10 * 1024L || value == actualSize,
+                    "Each future should have either the initial estimate or 
the actual size");
         }
     }
 

Reply via email to