This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b56c1de916b [fix][test] Fix flaky
PulsarFunctionsJavaThreadTest.testTumblingCountWindowTest (#25590)
b56c1de916b is described below
commit b56c1de916b2c0833253c038e362e94a3ad03607
Author: Praveen Kumar <[email protected]>
AuthorDate: Tue Apr 28 19:31:29 2026 +0530
[fix][test] Fix flaky
PulsarFunctionsJavaThreadTest.testTumblingCountWindowTest (#25590)
---
.../integration/functions/PulsarFunctionsTest.java | 27 ++++++++++++++--------
1 file changed, 17 insertions(+), 10 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index ed3aeafe6e4..86dc71f16fb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -330,11 +330,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
for (int i = 0; i < 3; i++) {
producer.send(String.format("%d", i).getBytes());
}
- TopicStats stats = pulsarAdmin.topics().getStats(inputTopicName, true);
- SubscriptionStats subStats =
stats.getSubscriptions().get("public/default/" + functionName);
- assertNotNull(subStats);
- assertEquals(3, subStats.getMsgBacklog());
- assertEquals(3, subStats.getUnackedMessages());
+ awaitAndVerifySubscriptionStats(inputTopicName, functionName, 3, 3);
for (int i = 3; i < numOfMessages; i++) {
producer.send(String.format("%d", i).getBytes());
@@ -363,17 +359,28 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
// test that all messages are acked
- stats = pulsarAdmin.topics().getStats(inputTopicName, true);
- subStats = stats.getSubscriptions().get("public/default/" +
functionName);
- assertNotNull(subStats);
- assertEquals(0, subStats.getMsgBacklog());
- assertEquals(0, subStats.getUnackedMessages());
+ awaitAndVerifySubscriptionStats(inputTopicName, functionName, 0, 0);
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
}
+ private void awaitAndVerifySubscriptionStats(String inputTopicName, String
functionName, int expectedBacklog,
+ int expectedUnacked) {
+ Awaitility.await()
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ TopicStats currentStats =
pulsarAdmin.topics().getStats(inputTopicName, true);
+ SubscriptionStats currentSubStats =
+
currentStats.getSubscriptions().get("public/default/" + functionName);
+ assertNotNull(currentSubStats);
+ // Compare actual to expected
+ assertEquals(currentSubStats.getMsgBacklog(),
expectedBacklog);
+ assertEquals(currentSubStats.getUnackedMessages(),
expectedUnacked);
+ });
+ }
+
protected void testFunctionNegAck(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD) {
return;