This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef010c3dcdf [fix][test] PIP-473: deflake 
TransactionCoordinatorV5Test.sweepTimeouts_abortsExpiredOpenTxnAndFansOut 
(#25961)
ef010c3dcdf is described below

commit ef010c3dcdf278122955ebdfd9faaff366b3ef53
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jun 7 23:37:52 2026 -0700

    [fix][test] PIP-473: deflake 
TransactionCoordinatorV5Test.sweepTimeouts_abortsExpiredOpenTxnAndFansOut 
(#25961)
---
 .../coordinator/v5/TransactionCoordinatorV5Test.java          | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
index 26badeacf6c..232dd559327 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -285,9 +285,14 @@ public class TransactionCoordinatorV5Test {
 
         List<String> received = new ArrayList<>();
         try (var sub = txnStore.subscribeSegmentEvents(segment, 
received::add)) {
-            tc.sweepTimeouts().get();
-            var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
-            assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+            // newTransaction and the first sweep can land in the same 
millisecond on a fast machine,
+            // leaving the 1ms-timeout txn not-yet-expired. The sweep is 
idempotent, so retry it until
+            // the deadline has elapsed and the txn is aborted.
+            Awaitility.await().untilAsserted(() -> {
+                tc.sweepTimeouts().get();
+                var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+                
assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+            });
             // Fan-out fires for the participant.
             Awaitility.await().untilAsserted(() -> 
assertThat(received).isNotEmpty());
         }

Reply via email to