This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5d03ef31401 [fix][test] Fix flaky TopicListSizeResultCacheTest
concurrent requests test (#25357)
5d03ef31401 is described below
commit 5d03ef31401112454e6ed6f6953222c5a3de7c1d
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:01:24 2026 -0700
[fix][test] Fix flaky TopicListSizeResultCacheTest concurrent requests test
(#25357)
---
.../TopicListSizeResultCacheTest.java | 33 ++++++++++++++--------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
index cea075886ad..397582d7022 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/topiclistlimit/TopicListSizeResultCacheTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -108,7 +109,8 @@ public class TopicListSizeResultCacheTest {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch completeLatch = new
CountDownLatch(numConcurrentRequests);
- List<CompletableFuture<Long>> futures = new ArrayList<>();
+ // Use a thread-safe list since multiple threads call add()
concurrently.
+ List<CompletableFuture<Long>> futures = new CopyOnWriteArrayList<>();
// Start concurrent requests
for (int i = 0; i < numConcurrentRequests; i++) {
@@ -128,23 +130,32 @@ public class TopicListSizeResultCacheTest {
startLatch.countDown();
completeLatch.await(5, TimeUnit.SECONDS);
- // First request completes immediately with initial estimate
- assertTrue(futures.get(0).isDone());
- assertEquals(futures.get(0).get().longValue(), 10 * 1024L);
+ // Exactly one request (the first to call getSizeAsync) completes
immediately
+ // with the initial estimate. The rest wait for updateSize(). We can't
assume
+ // which index in the list corresponds to the "first" caller since
threads
+ // race to add their futures to the list.
+ long doneCount =
futures.stream().filter(CompletableFuture::isDone).count();
+ assertEquals(doneCount, 1, "Exactly one request should be done (the
first caller)");
- // Other requests should be waiting
- for (int i = 1; i < numConcurrentRequests; i++) {
- assertFalse(futures.get(i).isDone(), "Concurrent request " + i + "
should be waiting");
- }
+ long waitingCount = futures.stream().filter(f -> !f.isDone()).count();
+ assertEquals(waitingCount, numConcurrentRequests - 1, "All other
requests should be waiting");
+
+ CompletableFuture<Long> firstFuture = futures.stream()
+ .filter(CompletableFuture::isDone).findFirst().orElseThrow();
+ assertEquals(firstFuture.get().longValue(), 10 * 1024L);
// Update size to complete waiting requests
long actualSize = 20 * 1024L;
holder.updateSize(actualSize);
// All waiting requests should now complete with the actual size
- for (int i = 1; i < numConcurrentRequests; i++) {
- assertTrue(futures.get(i).isDone(), "Request " + i + " should
complete after updateSize");
- assertEquals(futures.get(i).get().longValue(), actualSize);
+ for (CompletableFuture<Long> f : futures) {
+ assertTrue(f.isDone(), "All requests should be done after
updateSize");
+ }
+ for (CompletableFuture<Long> f : futures) {
+ long value = f.get().longValue();
+ assertTrue(value == 10 * 1024L || value == actualSize,
+ "Each future should have either the initial estimate or
the actual size");
}
}