This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 90c81d74015 [improve][broker] PIP-473 P5.2: scalable-topics TC timeout 
+ GC sweeps (#25884)
90c81d74015 is described below

commit 90c81d740151d8966ddc38c920e02f2eb6bcb01c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 1 18:34:31 2026 -0700

    [improve][broker] PIP-473 P5.2: scalable-topics TC timeout + GC sweeps 
(#25884)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  28 +++
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../coordinator/v5/TransactionCoordinatorV5.java   | 257 +++++++++++++++++++--
 .../broker/transaction/metadata/TxnPaths.java      |  14 ++
 .../v5/TransactionCoordinatorV5Test.java           |  94 +++++++-
 5 files changed, 384 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e5ecea2c44b..e5e69ea14ad 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3806,6 +3806,34 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean transactionCoordinatorScalableTopicsEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "Interval, in seconds, at which the scalable-topics 
transaction coordinator sweeps"
+                    + " for timed-out open transactions and aborts them. Only 
the broker that owns"
+                    + " partition 0 of the transaction-coordinator-assign 
topic runs the sweep."
+                    + " Only relevant when 
transactionCoordinatorScalableTopicsEnabled = true."
+    )
+    private int 
transactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds = 60;
+
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "Interval, in seconds, at which the scalable-topics 
transaction coordinator sweeps"
+                    + " for finalized transactions whose retention has elapsed 
and garbage-collects"
+                    + " their metadata. Only relevant when 
transactionCoordinatorScalableTopicsEnabled"
+                    + " = true."
+    )
+    private int transactionCoordinatorScalableTopicsGcIntervalSeconds = 300;
+
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "How long, in seconds, a finalized (committed/aborted) 
transaction's metadata is"
+                    + " retained before the scalable-topics transaction 
coordinator's GC sweep is"
+                    + " allowed to delete it. Gives participants time to 
observe the outcome via the"
+                    + " durable per-segment visibility state. Only relevant 
when"
+                    + " transactionCoordinatorScalableTopicsEnabled = true."
+    )
+    private int transactionCoordinatorScalableTopicsGcRetentionSeconds = 900;
+
     @FieldContext(
         category = CATEGORY_TRANSACTION,
             doc = "Class name for transaction metadata store provider"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index acf79195a3f..f80465ed3c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -705,6 +705,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             if (transactionTimer != null) {
                 transactionTimer.stop();
             }
+            if (transactionCoordinatorV5 != null) {
+                transactionCoordinatorV5.close();
+                transactionCoordinatorV5 = null;
+            }
             MLPendingAckStoreProvider.closeBufferedWriterMetrics();
             MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
             if (this.offloaderStats != null) {
@@ -1046,6 +1050,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
                 if (config.isTransactionCoordinatorScalableTopicsEnabled()) {
                     transactionCoordinatorV5 = new 
TransactionCoordinatorV5(this);
+                    transactionCoordinatorV5.start();
                 }
 
                 transactionBufferProvider = TransactionBufferProvider
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
index 1d3df7071ec..d4bfd8581a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.broker.transaction.coordinator.v5;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
@@ -33,6 +41,7 @@ import org.apache.pulsar.broker.transaction.metadata.TxnIds;
 import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
 import org.apache.pulsar.broker.transaction.metadata.TxnOp;
 import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
 import org.apache.pulsar.broker.transaction.metadata.TxnState;
 import org.apache.pulsar.broker.transaction.metadata.Versioned;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -46,9 +55,9 @@ import 
org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 
 /**
- * PIP-473 v5 transaction coordinator — broker-side service.
+ * Metadata-driven transaction coordinator for scalable topics — broker-side 
service.
  *
- * <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code 
N} iff it owns
+ * <p>Per-partition coordinator. A broker runs the TC for partition {@code N} 
iff it owns
  * partition {@code N} of {@code 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
  * leader-election mechanism the legacy {@code 
TransactionMetadataStoreService} uses; reusing
  * it keeps the client-side discovery surface unchanged.
@@ -58,9 +67,9 @@ import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
  * <ul>
  *   <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li>
  *   <li>{@code NEW_TXN} → {@link #newTransaction}</li>
- *   <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — 
no-ops per PIP; the v5
- *       participants advertise themselves by writing {@code /txn/op} records, 
so the TC doesn't
- *       need a pre-registration step.</li>
+ *   <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — 
no-ops; participants
+ *       advertise themselves by writing {@code /txn/op} records, so the TC 
doesn't need a
+ *       pre-registration step.</li>
  *   <li>{@code END_TXN} → {@link #endTransaction}</li>
  * </ul>
  *
@@ -70,8 +79,12 @@ import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
  * {@code (segment, subscription)} pair. The fan-out is metadata-store writes 
(not RPCs) and
  * is bounded by the txn's participant count.
  *
- * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — 
those land in
- * P5.2.
+ * <p>Background sweeps: a single elected broker — the owner of partition 0 of
+ * {@code transaction_coordinator_assign} — periodically (a) aborts timed-out 
open transactions
+ * ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions 
whose retention has
+ * elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still 
safe — every state
+ * transition is a header CAS — so the single-sweeper election is an 
efficiency measure, not a
+ * correctness one.
  */
 @CustomLog
 public class TransactionCoordinatorV5 {
@@ -79,9 +92,83 @@ public class TransactionCoordinatorV5 {
     private final PulsarService pulsar;
     private final TxnMetadataStore txnStore;
 
+    private final long timeoutSweepIntervalMs;
+    private final long gcSweepIntervalMs;
+    private final long gcRetentionMs;
+    private volatile ScheduledExecutorService sweepExecutor;
+    private volatile boolean closed;
+    private final AtomicBoolean timeoutSweepRunning = new AtomicBoolean(false);
+    private final AtomicBoolean gcSweepRunning = new AtomicBoolean(false);
+
     public TransactionCoordinatorV5(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+        var config = pulsar.getConfiguration();
+        this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+                
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
+        this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+                
config.getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+        this.gcRetentionMs = TimeUnit.SECONDS.toMillis(
+                
config.getTransactionCoordinatorScalableTopicsGcRetentionSeconds());
+    }
+
+    // ---- Lifecycle --------------------------------------------------------
+
+    /**
+     * Start the periodic timeout / GC sweeps on a dedicated single-thread 
scheduler. Each tick is
+     * gated by {@link #ifElectedSweeper} so only the partition-0 owner does 
the scan. Idempotent —
+     * a second call is ignored.
+     */
+    public synchronized void start() {
+        if (closed || sweepExecutor != null) {
+            return;
+        }
+        sweepExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-txn-v5-sweep"));
+        sweepExecutor.scheduleWithFixedDelay(
+                () -> runSweep("timeout", timeoutSweepRunning, 
this::sweepTimeouts),
+                timeoutSweepIntervalMs, timeoutSweepIntervalMs, 
TimeUnit.MILLISECONDS);
+        sweepExecutor.scheduleWithFixedDelay(
+                () -> runSweep("gc", gcSweepRunning, this::sweepGc),
+                gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    /** Stop the sweeps. Idempotent. */
+    public synchronized void close() {
+        closed = true;
+        if (sweepExecutor != null) {
+            sweepExecutor.shutdownNow();
+            sweepExecutor = null;
+        }
+    }
+
+    /**
+     * Run one sweep cycle on the scheduler thread and block until it 
completes, so the
+     * fixed-delay scheduling never overlaps two cycles. The {@code running} 
flag is a
+     * defense-in-depth guard: the single-thread scheduler plus the blocking 
{@code get()} already
+     * serialise cycles, but the flag makes overlap impossible even if the 
scheduling were later
+     * changed (e.g. to a fixed-rate or multi-threaded executor). Errors are 
logged and swallowed —
+     * the next tick retries.
+     */
+    private void runSweep(String name, AtomicBoolean running, 
Supplier<CompletableFuture<Void>> sweep) {
+        if (closed || !running.compareAndSet(false, true)) {
+            return;
+        }
+        try {
+            sweep.get().get();
+        } catch (InterruptedException ie) {
+            // shutdownNow() interrupted the sweep thread mid-wait — restore 
the flag and exit
+            // quietly; this is the expected shutdown signal, not a failure.
+            Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+            if (closed) {
+                // close() raced with an in-flight async chain; not worth a 
WARN.
+                return;
+            }
+            log.warn().attr("sweep", name).exception(t).log("v5 TC sweep cycle 
failed; will retry");
+        } finally {
+            running.set(false);
+        }
     }
 
     // ---- TC client connect ------------------------------------------------
@@ -232,19 +319,163 @@ public class TransactionCoordinatorV5 {
             }
         }).thenCompose(__ -> {
             TxnEvent event = new TxnEvent(txnIdKey, decision);
-            CompletableFuture<?>[] publishes = new CompletableFuture<?>[
-                    writeSegments.size() + ackParticipants.size()];
-            int i = 0;
+            List<CompletableFuture<Void>> publishes = new ArrayList<>(
+                    writeSegments.size() + ackParticipants.size());
             for (String segment : writeSegments) {
-                publishes[i++] = txnStore.publishSegmentEvent(segment, event);
+                publishes.add(txnStore.publishSegmentEvent(segment, 
event).thenApply(s -> null));
             }
             for (AckParticipant p : ackParticipants) {
-                publishes[i++] = 
txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event);
+                publishes.add(txnStore.publishSubscriptionEvent(p.segment(), 
p.subscription(), event)
+                        .thenApply(s -> null));
             }
-            return CompletableFuture.allOf(publishes);
+            return FutureUtil.waitForAll(publishes);
+        });
+    }
+
+    // ---- Sweeps -----------------------------------------------------------
+
+    /**
+     * Abort transactions whose deadline has passed. Scans the by-deadline 
index up to "now" and
+     * drives each through {@link #endTransaction} with {@code ABORT}, which 
re-reads and CAS-guards
+     * the header — so a txn the client commits in the same window is left 
alone (the CAS loses and
+     * the resulting InvalidTxnStatus / BadVersion is treated as a benign 
race).
+     */
+    CompletableFuture<Void> sweepTimeouts() {
+        return ifElectedSweeper(() -> {
+            long now = System.currentTimeMillis();
+            List<TxnID> expired = Collections.synchronizedList(new 
ArrayList<>());
+            return txnStore.listOpenByDeadlineRange(null, now, new 
ScanConsumer() {
+                @Override
+                public void onNext(GetResult r) {
+                    String txnIdKey = 
TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
+                    if (txnIdKey != null) {
+                        expired.add(TxnIds.fromKey(txnIdKey));
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    log.warn().exception(throwable).log("Timeout-sweep 
deadline scan errored");
+                }
+
+                @Override
+                public void onCompleted() {
+                }
+            }).thenCompose(__ -> {
+                List<CompletableFuture<Void>> aborts = new 
ArrayList<>(expired.size());
+                for (TxnID txnId : expired) {
+                    aborts.add(endTransaction(txnId, TxnAction.ABORT_VALUE)
+                            .exceptionally(ex -> {
+                                // Benign: the client may have 
committed/aborted it between the scan
+                                // and our CAS, or another sweeper got there 
first.
+                                log.debug().attr("txnId", txnId).exception(ex)
+                                        .log("Timeout-sweep abort skipped");
+                                return null;
+                            }));
+                }
+                return FutureUtil.waitForAll(aborts);
+            });
         });
     }
 
+    /**
+     * Garbage-collect finalized transactions whose retention window has 
elapsed. For each terminal
+     * state, scans the by-final-state index up to {@code now - retention} and 
applies
+     * {@link #gcOneTxn}.
+     */
+    CompletableFuture<Void> sweepGc() {
+        return ifElectedSweeper(() -> {
+            long cutoff = System.currentTimeMillis() - gcRetentionMs;
+            return gcFinalized(TxnState.COMMITTED, cutoff)
+                    .thenCompose(__ -> gcFinalized(TxnState.ABORTED, cutoff));
+        });
+    }
+
+    private CompletableFuture<Void> gcFinalized(TxnState state, long cutoffMs) 
{
+        List<Versioned<String>> candidates = Collections.synchronizedList(new 
ArrayList<>());
+        return txnStore.listFinalizedByStateAndTimeRange(state, null, 
cutoffMs, new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                String txnIdKey = 
TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
+                if (txnIdKey != null) {
+                    candidates.add(new Versioned<>(txnIdKey, 
r.getStat().getVersion()));
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                log.warn().attr("state", 
state).exception(throwable).log("GC-sweep scan errored");
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        }).thenCompose(__ -> {
+            List<CompletableFuture<Void>> gcs = new 
ArrayList<>(candidates.size());
+            for (Versioned<String> c : candidates) {
+                gcs.add(gcOneTxn(c.value(), c.version(), state));
+            }
+            return FutureUtil.waitForAll(gcs);
+        });
+    }
+
+    /**
+     * GC one finalized txn. If it still has {@code /txn/op} records, some 
participant hasn't applied
+     * the outcome yet — or never received the event (e.g. the TC crashed 
between the header CAS and
+     * the fan-out). Re-drive the fan-out and leave the header in place so the 
participant re-reads
+     * the true outcome; it removes its op records once it applies them, and a 
later GC pass — seeing
+     * no op records — deletes the header. We never delete a header while a 
participant might still
+     * re-read it, so a committed txn's data is never stranded as "unknown".
+     */
+    private CompletableFuture<Void> gcOneTxn(String txnIdKey, long version, 
TxnState state) {
+        TxnID txnId = TxnIds.fromKey(txnIdKey);
+        boolean[] hasOps = {false};
+        return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                hasOps[0] = true;
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // Treat a scan error as "ops may exist" — safer to retry the 
repair than to delete.
+                hasOps[0] = true;
+                log.warn().attr("txnId", 
txnId).exception(throwable).log("GC-sweep op scan errored");
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        }).thenCompose(__ -> {
+            if (hasOps[0]) {
+                return fanOutEvents(txnId, txnIdKey, state);
+            }
+            return txnStore.deleteHeader(txnIdKey, version).exceptionally(ex 
-> {
+                // Benign: header changed or was already deleted since the 
scan.
+                log.debug().attr("txnId", txnId).exception(ex).log("GC-sweep 
header delete skipped");
+                return null;
+            });
+        });
+    }
+
+    /**
+     * Run {@code action} only on the elected sweeper — the broker that owns 
partition 0 of
+     * {@code transaction_coordinator_assign}. Not owning it (or any error 
checking ownership) means
+     * "skip this cycle". Correctness doesn't depend on the election: every 
transition is a header
+     * CAS, so a stale owner sweeping concurrently is harmless.
+     */
+    private CompletableFuture<Void> 
ifElectedSweeper(Supplier<CompletableFuture<Void>> action) {
+        if (closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        String assignPartition0 = 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+                .getPartition(0).toString();
+        return 
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition0)
+                .handle((v, ex) -> ex == null)
+                .thenCompose(owned -> (owned && !closed)
+                        ? action.get() : 
CompletableFuture.completedFuture(null));
+    }
+
     /** A {@code (segment, subscription)} ack participant; keys the ack 
fan-out de-dup set. */
     private record AckParticipant(String segment, String subscription) {
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index 7e25ebb33ba..0c5bbcfd972 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -294,5 +294,19 @@ public final class TxnPaths {
         return name.substring(0, dash);
     }
 
+    /**
+     * Extract the {@code txnId} key from a header path under {@link 
#TXN_HEADER_PREFIX}. The layout
+     * is {@code /txn/id/<txnId>}, so the txnId key is the trailing path 
component.
+     *
+     * @return the txnId key, or {@code null} if {@code headerPath} doesn't 
have the expected shape
+     */
+    public static String txnIdFromHeaderPath(String headerPath) {
+        int lastSlash = headerPath.lastIndexOf('/');
+        if (lastSlash < 0 || lastSlash == headerPath.length() - 1) {
+            return null;
+        }
+        return headerPath.substring(lastSlash + 1);
+    }
+
     private TxnPaths() {}
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
index 38150874db9..c5f261e85ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
 import org.apache.pulsar.broker.transaction.metadata.TxnIds;
 import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
 import org.apache.pulsar.broker.transaction.metadata.TxnOp;
@@ -57,6 +59,7 @@ public class TransactionCoordinatorV5Test {
     private MetadataStoreExtended store;
     private TxnMetadataStore txnStore;
     private PulsarService pulsar;
+    private BrokerService brokerService;
     private TransactionCoordinatorV5 tc;
 
     @BeforeMethod
@@ -66,7 +69,11 @@ public class TransactionCoordinatorV5Test {
         txnStore = new TxnMetadataStore(store);
         pulsar = mock(PulsarService.class);
         when(pulsar.getLocalMetadataStore()).thenReturn(store);
-        BrokerService brokerService = mock(BrokerService.class);
+        ServiceConfiguration cfg = new ServiceConfiguration();
+        // GC sweep tests assume retention has already elapsed.
+        cfg.setTransactionCoordinatorScalableTopicsGcRetentionSeconds(0);
+        when(pulsar.getConfiguration()).thenReturn(cfg);
+        brokerService = mock(BrokerService.class);
         when(pulsar.getBrokerService()).thenReturn(brokerService);
         // Default: owned. Tests that want to assert the not-owned path can 
override.
         
when(brokerService.checkTopicNsOwnership(any())).thenReturn(CompletableFuture.completedFuture(null));
@@ -75,6 +82,9 @@ public class TransactionCoordinatorV5Test {
 
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws Exception {
+        if (tc != null) {
+            tc.close();
+        }
         if (store != null) {
             store.close();
         }
@@ -240,4 +250,86 @@ public class TransactionCoordinatorV5Test {
         assertThat(a.getMostSigBits()).isEqualTo(1L);
         assertThat(b.getMostSigBits()).isEqualTo(2L);
     }
+
+    @Test
+    public void sweepTimeouts_abortsExpiredOpenTxnAndFansOut() throws 
Exception {
+        // 1ms timeout → deadline already in the past when the sweep runs.
+        TxnID txnId = tc.newTransaction(TC_ID, 1L, "owner").get();
+        String txnIdKey = TxnIds.toKey(txnId);
+        String segment = "segment://public/default/topic/0000-ffff-0";
+        txnStore.appendOp(txnIdKey,
+                new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+        List<String> received = new ArrayList<>();
+        try (var sub = txnStore.subscribeSegmentEvents(segment, 
received::add)) {
+            tc.sweepTimeouts().get();
+            var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+            assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+            // Fan-out fires for the participant.
+            Awaitility.await().untilAsserted(() -> 
assertThat(received).isNotEmpty());
+        }
+    }
+
+    @Test
+    public void sweepTimeouts_leavesUnexpiredOpenTxnAlone() throws Exception {
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        tc.sweepTimeouts().get();
+        var header = 
txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+        assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+    }
+
+    @Test
+    public void sweepGc_deletesHeaderWhenNoOpsRemain() throws Exception {
+        // No participants → fan-out wrote no events → no /txn/op records to 
clean up → GC may
+        // delete the header straight away.
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        String txnIdKey = TxnIds.toKey(txnId);
+        tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+
+        tc.sweepGc().get();
+
+        assertThat(txnStore.getHeader(txnIdKey).get()).isEmpty();
+    }
+
+    @Test
+    public void sweepGc_repairsAndRetainsHeaderWhenOpsRemain() throws 
Exception {
+        // A finalized txn with a leftover /txn/op record — the participant 
either hasn't applied
+        // the outcome yet, or never received the event (TC crashed between 
header CAS and publish).
+        // GC must re-drive the fan-out so the participant re-reads the true 
outcome, and must NOT
+        // delete the header while it could still be re-read (else a COMMITTED 
txn's data would
+        // default to ABORTED).
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        String txnIdKey = TxnIds.toKey(txnId);
+        String segment = "segment://public/default/topic/0000-ffff-0";
+        txnStore.appendOp(txnIdKey,
+                new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+        List<String> received = new ArrayList<>();
+        try (var sub = txnStore.subscribeSegmentEvents(segment, 
received::add)) {
+            tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+            Awaitility.await().untilAsserted(() -> 
assertThat(received).hasSize(1));
+
+            tc.sweepGc().get();
+
+            // Header retained — participant may still need to re-read.
+            TxnHeader header = 
txnStore.getHeader(txnIdKey).get().orElseThrow().value();
+            assertThat(header.getState()).isEqualTo(TxnState.COMMITTED);
+            // Repair re-published the event.
+            Awaitility.await().untilAsserted(() -> 
assertThat(received).hasSize(2));
+        }
+    }
+
+    @Test
+    public void sweeps_skipWhenNotElected() throws Exception {
+        // Override the owned-default with a failure → not the elected sweeper 
→ action skipped.
+        when(brokerService.checkTopicNsOwnership(any())).thenReturn(
+                CompletableFuture.failedFuture(new RuntimeException("not 
owner")));
+
+        TxnID expired = tc.newTransaction(TC_ID, 1L, "owner").get();
+        tc.sweepTimeouts().get();
+
+        // Still OPEN — the sweep never ran because we don't own 
assign-partition 0.
+        var header = 
txnStore.getHeader(TxnIds.toKey(expired)).get().orElseThrow();
+        assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+    }
 }

Reply via email to