This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef010c3dcdf [fix][test] PIP-473: deflake
TransactionCoordinatorV5Test.sweepTimeouts_abortsExpiredOpenTxnAndFansOut
(#25961)
ef010c3dcdf is described below
commit ef010c3dcdf278122955ebdfd9faaff366b3ef53
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jun 7 23:37:52 2026 -0700
[fix][test] PIP-473: deflake
TransactionCoordinatorV5Test.sweepTimeouts_abortsExpiredOpenTxnAndFansOut
(#25961)
---
.../coordinator/v5/TransactionCoordinatorV5Test.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
index 26badeacf6c..232dd559327 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -285,9 +285,14 @@ public class TransactionCoordinatorV5Test {
List<String> received = new ArrayList<>();
try (var sub = txnStore.subscribeSegmentEvents(segment,
received::add)) {
- tc.sweepTimeouts().get();
- var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
- assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+ // newTransaction and the first sweep can land in the same
millisecond on a fast machine,
+ // leaving the 1ms-timeout txn not-yet-expired. The sweep is
idempotent, so retry it until
+ // the deadline has elapsed and the txn is aborted.
+ Awaitility.await().untilAsserted(() -> {
+ tc.sweepTimeouts().get();
+ var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+
assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+ });
// Fan-out fires for the participant.
Awaitility.await().untilAsserted(() ->
assertThat(received).isNotEmpty());
}