This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b56c1de916b [fix][test] Fix flaky 
PulsarFunctionsJavaThreadTest.testTumblingCountWindowTest (#25590)
b56c1de916b is described below

commit b56c1de916b2c0833253c038e362e94a3ad03607
Author: Praveen Kumar <[email protected]>
AuthorDate: Tue Apr 28 19:31:29 2026 +0530

    [fix][test] Fix flaky 
PulsarFunctionsJavaThreadTest.testTumblingCountWindowTest (#25590)
---
 .../integration/functions/PulsarFunctionsTest.java | 27 ++++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index ed3aeafe6e4..86dc71f16fb 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -330,11 +330,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         for (int i = 0; i < 3; i++) {
             producer.send(String.format("%d", i).getBytes());
         }
-        TopicStats stats = pulsarAdmin.topics().getStats(inputTopicName, true);
-        SubscriptionStats subStats = 
stats.getSubscriptions().get("public/default/" + functionName);
-        assertNotNull(subStats);
-        assertEquals(3, subStats.getMsgBacklog());
-        assertEquals(3, subStats.getUnackedMessages());
+        awaitAndVerifySubscriptionStats(inputTopicName, functionName, 3, 3);
 
         for (int i = 3; i < numOfMessages; i++) {
             producer.send(String.format("%d", i).getBytes());
@@ -363,17 +359,28 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
 
         // test that all messages are acked
-        stats = pulsarAdmin.topics().getStats(inputTopicName, true);
-        subStats = stats.getSubscriptions().get("public/default/" + 
functionName);
-        assertNotNull(subStats);
-        assertEquals(0, subStats.getMsgBacklog());
-        assertEquals(0, subStats.getUnackedMessages());
+        awaitAndVerifySubscriptionStats(inputTopicName, functionName, 0, 0);
 
         deleteFunction(functionName);
 
         getFunctionInfoNotFound(functionName);
     }
 
+    private void awaitAndVerifySubscriptionStats(String inputTopicName, String 
functionName, int expectedBacklog,
+                                                 int expectedUnacked) {
+        Awaitility.await()
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    TopicStats currentStats = 
pulsarAdmin.topics().getStats(inputTopicName, true);
+                    SubscriptionStats currentSubStats =
+                            
currentStats.getSubscriptions().get("public/default/" + functionName);
+                    assertNotNull(currentSubStats);
+                    // Compare actual to expected
+                    assertEquals(currentSubStats.getMsgBacklog(), 
expectedBacklog);
+                    assertEquals(currentSubStats.getUnackedMessages(), 
expectedUnacked);
+                });
+    }
+
     protected void testFunctionNegAck(Runtime runtime) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD) {
             return;

Reply via email to