This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 90c81d74015 [improve][broker] PIP-473 P5.2: scalable-topics TC timeout
+ GC sweeps (#25884)
90c81d74015 is described below
commit 90c81d740151d8966ddc38c920e02f2eb6bcb01c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 1 18:34:31 2026 -0700
[improve][broker] PIP-473 P5.2: scalable-topics TC timeout + GC sweeps
(#25884)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 28 +++
.../org/apache/pulsar/broker/PulsarService.java | 5 +
.../coordinator/v5/TransactionCoordinatorV5.java | 257 +++++++++++++++++++--
.../broker/transaction/metadata/TxnPaths.java | 14 ++
.../v5/TransactionCoordinatorV5Test.java | 94 +++++++-
5 files changed, 384 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e5ecea2c44b..e5e69ea14ad 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3806,6 +3806,34 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean transactionCoordinatorScalableTopicsEnabled = false;
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "Interval, in seconds, at which the scalable-topics
transaction coordinator sweeps"
+ + " for timed-out open transactions and aborts them. Only
the broker that owns"
+ + " partition 0 of the transaction-coordinator-assign
topic runs the sweep."
+ + " Only relevant when
transactionCoordinatorScalableTopicsEnabled = true."
+ )
+ private int
transactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds = 60;
+
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "Interval, in seconds, at which the scalable-topics
transaction coordinator sweeps"
+ + " for finalized transactions whose retention has elapsed
and garbage-collects"
+ + " their metadata. Only relevant when
transactionCoordinatorScalableTopicsEnabled"
+ + " = true."
+ )
+ private int transactionCoordinatorScalableTopicsGcIntervalSeconds = 300;
+
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "How long, in seconds, a finalized (committed/aborted)
transaction's metadata is"
+ + " retained before the scalable-topics transaction
coordinator's GC sweep is"
+ + " allowed to delete it. Gives participants time to
observe the outcome via the"
+ + " durable per-segment visibility state. Only relevant
when"
+ + " transactionCoordinatorScalableTopicsEnabled = true."
+ )
+ private int transactionCoordinatorScalableTopicsGcRetentionSeconds = 900;
+
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Class name for transaction metadata store provider"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index acf79195a3f..f80465ed3c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -705,6 +705,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (transactionTimer != null) {
transactionTimer.stop();
}
+ if (transactionCoordinatorV5 != null) {
+ transactionCoordinatorV5.close();
+ transactionCoordinatorV5 = null;
+ }
MLPendingAckStoreProvider.closeBufferedWriterMetrics();
MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
if (this.offloaderStats != null) {
@@ -1046,6 +1050,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (config.isTransactionCoordinatorScalableTopicsEnabled()) {
transactionCoordinatorV5 = new
TransactionCoordinatorV5(this);
+ transactionCoordinatorV5.start();
}
transactionBufferProvider = TransactionBufferProvider
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
index 1d3df7071ec..d4bfd8581a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
@@ -18,12 +18,20 @@
*/
package org.apache.pulsar.broker.transaction.coordinator.v5;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
@@ -33,6 +41,7 @@ import org.apache.pulsar.broker.transaction.metadata.TxnIds;
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
import org.apache.pulsar.broker.transaction.metadata.TxnOp;
import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
import org.apache.pulsar.broker.transaction.metadata.TxnState;
import org.apache.pulsar.broker.transaction.metadata.Versioned;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -46,9 +55,9 @@ import
org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
/**
- * PIP-473 v5 transaction coordinator — broker-side service.
+ * Metadata-driven transaction coordinator for scalable topics — broker-side
service.
*
- * <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code
N} iff it owns
+ * <p>Per-partition coordinator. A broker runs the TC for partition {@code N}
iff it owns
* partition {@code N} of {@code
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
* leader-election mechanism the legacy {@code
TransactionMetadataStoreService} uses; reusing
* it keeps the client-side discovery surface unchanged.
@@ -58,9 +67,9 @@ import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
* <ul>
* <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li>
* <li>{@code NEW_TXN} → {@link #newTransaction}</li>
- * <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} —
no-ops per PIP; the v5
- * participants advertise themselves by writing {@code /txn/op} records,
so the TC doesn't
- * need a pre-registration step.</li>
+ * <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} —
no-ops; participants
+ * advertise themselves by writing {@code /txn/op} records, so the TC
doesn't need a
+ * pre-registration step.</li>
* <li>{@code END_TXN} → {@link #endTransaction}</li>
* </ul>
*
@@ -70,8 +79,12 @@ import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
* {@code (segment, subscription)} pair. The fan-out is metadata-store writes
(not RPCs) and
* is bounded by the txn's participant count.
*
- * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep —
those land in
- * P5.2.
+ * <p>Background sweeps: a single elected broker — the owner of partition 0 of
+ * {@code transaction_coordinator_assign} — periodically (a) aborts timed-out
open transactions
+ * ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions
whose retention has
+ * elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still
safe — every state
+ * transition is a header CAS — so the single-sweeper election is an
efficiency measure, not a
+ * correctness one.
*/
@CustomLog
public class TransactionCoordinatorV5 {
@@ -79,9 +92,83 @@ public class TransactionCoordinatorV5 {
private final PulsarService pulsar;
private final TxnMetadataStore txnStore;
+ private final long timeoutSweepIntervalMs;
+ private final long gcSweepIntervalMs;
+ private final long gcRetentionMs;
+ private volatile ScheduledExecutorService sweepExecutor;
+ private volatile boolean closed;
+ private final AtomicBoolean timeoutSweepRunning = new AtomicBoolean(false);
+ private final AtomicBoolean gcSweepRunning = new AtomicBoolean(false);
+
public TransactionCoordinatorV5(PulsarService pulsar) {
this.pulsar = pulsar;
this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+ var config = pulsar.getConfiguration();
+ this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
+ this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+
config.getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+ this.gcRetentionMs = TimeUnit.SECONDS.toMillis(
+
config.getTransactionCoordinatorScalableTopicsGcRetentionSeconds());
+ }
+
+ // ---- Lifecycle --------------------------------------------------------
+
+ /**
+ * Start the periodic timeout / GC sweeps on a dedicated single-thread
scheduler. Each tick is
+ * gated by {@link #ifElectedSweeper} so only the partition-0 owner does
the scan. Idempotent —
+ * a second call is ignored.
+ */
+ public synchronized void start() {
+ if (closed || sweepExecutor != null) {
+ return;
+ }
+ sweepExecutor = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("pulsar-txn-v5-sweep"));
+ sweepExecutor.scheduleWithFixedDelay(
+ () -> runSweep("timeout", timeoutSweepRunning,
this::sweepTimeouts),
+ timeoutSweepIntervalMs, timeoutSweepIntervalMs,
TimeUnit.MILLISECONDS);
+ sweepExecutor.scheduleWithFixedDelay(
+ () -> runSweep("gc", gcSweepRunning, this::sweepGc),
+ gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ /** Stop the sweeps. Idempotent. */
+ public synchronized void close() {
+ closed = true;
+ if (sweepExecutor != null) {
+ sweepExecutor.shutdownNow();
+ sweepExecutor = null;
+ }
+ }
+
+ /**
+ * Run one sweep cycle on the scheduler thread and block until it
completes, so the
+ * fixed-delay scheduling never overlaps two cycles. The {@code running}
flag is a
+ * defense-in-depth guard: the single-thread scheduler plus the blocking
{@code get()} already
+ * serialise cycles, but the flag makes overlap impossible even if the
scheduling were later
+ * changed (e.g. to a fixed-rate or multi-threaded executor). Errors are
logged and swallowed —
+ * the next tick retries.
+ */
+ private void runSweep(String name, AtomicBoolean running,
Supplier<CompletableFuture<Void>> sweep) {
+ if (closed || !running.compareAndSet(false, true)) {
+ return;
+ }
+ try {
+ sweep.get().get();
+ } catch (InterruptedException ie) {
+ // shutdownNow() interrupted the sweep thread mid-wait — restore
the flag and exit
+ // quietly; this is the expected shutdown signal, not a failure.
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ if (closed) {
+ // close() raced with an in-flight async chain; not worth a
WARN.
+ return;
+ }
+ log.warn().attr("sweep", name).exception(t).log("v5 TC sweep cycle
failed; will retry");
+ } finally {
+ running.set(false);
+ }
}
// ---- TC client connect ------------------------------------------------
@@ -232,19 +319,163 @@ public class TransactionCoordinatorV5 {
}
}).thenCompose(__ -> {
TxnEvent event = new TxnEvent(txnIdKey, decision);
- CompletableFuture<?>[] publishes = new CompletableFuture<?>[
- writeSegments.size() + ackParticipants.size()];
- int i = 0;
+ List<CompletableFuture<Void>> publishes = new ArrayList<>(
+ writeSegments.size() + ackParticipants.size());
for (String segment : writeSegments) {
- publishes[i++] = txnStore.publishSegmentEvent(segment, event);
+ publishes.add(txnStore.publishSegmentEvent(segment,
event).thenApply(s -> null));
}
for (AckParticipant p : ackParticipants) {
- publishes[i++] =
txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event);
+ publishes.add(txnStore.publishSubscriptionEvent(p.segment(),
p.subscription(), event)
+ .thenApply(s -> null));
}
- return CompletableFuture.allOf(publishes);
+ return FutureUtil.waitForAll(publishes);
+ });
+ }
+
+ // ---- Sweeps -----------------------------------------------------------
+
+ /**
+ * Abort transactions whose deadline has passed. Scans the by-deadline
index up to "now" and
+ * drives each through {@link #endTransaction} with {@code ABORT}, which
re-reads and CAS-guards
+ * the header — so a txn the client commits in the same window is left
alone (the CAS loses and
+ * the resulting InvalidTxnStatus / BadVersion is treated as a benign
race).
+ */
+ CompletableFuture<Void> sweepTimeouts() {
+ return ifElectedSweeper(() -> {
+ long now = System.currentTimeMillis();
+ List<TxnID> expired = Collections.synchronizedList(new
ArrayList<>());
+ return txnStore.listOpenByDeadlineRange(null, now, new
ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ String txnIdKey =
TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
+ if (txnIdKey != null) {
+ expired.add(TxnIds.fromKey(txnIdKey));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().exception(throwable).log("Timeout-sweep
deadline scan errored");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ List<CompletableFuture<Void>> aborts = new
ArrayList<>(expired.size());
+ for (TxnID txnId : expired) {
+ aborts.add(endTransaction(txnId, TxnAction.ABORT_VALUE)
+ .exceptionally(ex -> {
+ // Benign: the client may have
committed/aborted it between the scan
+ // and our CAS, or another sweeper got there
first.
+ log.debug().attr("txnId", txnId).exception(ex)
+ .log("Timeout-sweep abort skipped");
+ return null;
+ }));
+ }
+ return FutureUtil.waitForAll(aborts);
+ });
});
}
+ /**
+ * Garbage-collect finalized transactions whose retention window has
elapsed. For each terminal
+ * state, scans the by-final-state index up to {@code now - retention} and
applies
+ * {@link #gcOneTxn}.
+ */
+ CompletableFuture<Void> sweepGc() {
+ return ifElectedSweeper(() -> {
+ long cutoff = System.currentTimeMillis() - gcRetentionMs;
+ return gcFinalized(TxnState.COMMITTED, cutoff)
+ .thenCompose(__ -> gcFinalized(TxnState.ABORTED, cutoff));
+ });
+ }
+
+ private CompletableFuture<Void> gcFinalized(TxnState state, long cutoffMs)
{
+ List<Versioned<String>> candidates = Collections.synchronizedList(new
ArrayList<>());
+ return txnStore.listFinalizedByStateAndTimeRange(state, null,
cutoffMs, new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ String txnIdKey =
TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
+ if (txnIdKey != null) {
+ candidates.add(new Versioned<>(txnIdKey,
r.getStat().getVersion()));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("state",
state).exception(throwable).log("GC-sweep scan errored");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ List<CompletableFuture<Void>> gcs = new
ArrayList<>(candidates.size());
+ for (Versioned<String> c : candidates) {
+ gcs.add(gcOneTxn(c.value(), c.version(), state));
+ }
+ return FutureUtil.waitForAll(gcs);
+ });
+ }
+
+ /**
+ * GC one finalized txn. If it still has {@code /txn/op} records, some
participant hasn't applied
+ * the outcome yet — or never received the event (e.g. the TC crashed
between the header CAS and
+ * the fan-out). Re-drive the fan-out and leave the header in place so the
participant re-reads
+ * the true outcome; it removes its op records once it applies them, and a
later GC pass — seeing
+ * no op records — deletes the header. We never delete a header while a
participant might still
+ * re-read it, so a committed txn's data is never stranded as "unknown".
+ */
+ private CompletableFuture<Void> gcOneTxn(String txnIdKey, long version,
TxnState state) {
+ TxnID txnId = TxnIds.fromKey(txnIdKey);
+ boolean[] hasOps = {false};
+ return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ hasOps[0] = true;
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // Treat a scan error as "ops may exist" — safer to retry the
repair than to delete.
+ hasOps[0] = true;
+ log.warn().attr("txnId",
txnId).exception(throwable).log("GC-sweep op scan errored");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ if (hasOps[0]) {
+ return fanOutEvents(txnId, txnIdKey, state);
+ }
+ return txnStore.deleteHeader(txnIdKey, version).exceptionally(ex
-> {
+ // Benign: header changed or was already deleted since the
scan.
+ log.debug().attr("txnId", txnId).exception(ex).log("GC-sweep
header delete skipped");
+ return null;
+ });
+ });
+ }
+
+ /**
+ * Run {@code action} only on the elected sweeper — the broker that owns
partition 0 of
+ * {@code transaction_coordinator_assign}. Not owning it (or any error
checking ownership) means
+ * "skip this cycle". Correctness doesn't depend on the election: every
transition is a header
+ * CAS, so a stale owner sweeping concurrently is harmless.
+ */
+ private CompletableFuture<Void>
ifElectedSweeper(Supplier<CompletableFuture<Void>> action) {
+ if (closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ String assignPartition0 =
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+ .getPartition(0).toString();
+ return
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition0)
+ .handle((v, ex) -> ex == null)
+ .thenCompose(owned -> (owned && !closed)
+ ? action.get() :
CompletableFuture.completedFuture(null));
+ }
+
/** A {@code (segment, subscription)} ack participant; keys the ack
fan-out de-dup set. */
private record AckParticipant(String segment, String subscription) {
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index 7e25ebb33ba..0c5bbcfd972 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -294,5 +294,19 @@ public final class TxnPaths {
return name.substring(0, dash);
}
+ /**
+ * Extract the {@code txnId} key from a header path under {@link
#TXN_HEADER_PREFIX}. The layout
+ * is {@code /txn/id/<txnId>}, so the txnId key is the trailing path
component.
+ *
+ * @return the txnId key, or {@code null} if {@code headerPath} doesn't
have the expected shape
+ */
+ public static String txnIdFromHeaderPath(String headerPath) {
+ int lastSlash = headerPath.lastIndexOf('/');
+ if (lastSlash < 0 || lastSlash == headerPath.length() - 1) {
+ return null;
+ }
+ return headerPath.substring(lastSlash + 1);
+ }
+
private TxnPaths() {}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
index 38150874db9..c5f261e85ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
import org.apache.pulsar.broker.transaction.metadata.TxnOp;
@@ -57,6 +59,7 @@ public class TransactionCoordinatorV5Test {
private MetadataStoreExtended store;
private TxnMetadataStore txnStore;
private PulsarService pulsar;
+ private BrokerService brokerService;
private TransactionCoordinatorV5 tc;
@BeforeMethod
@@ -66,7 +69,11 @@ public class TransactionCoordinatorV5Test {
txnStore = new TxnMetadataStore(store);
pulsar = mock(PulsarService.class);
when(pulsar.getLocalMetadataStore()).thenReturn(store);
- BrokerService brokerService = mock(BrokerService.class);
+ ServiceConfiguration cfg = new ServiceConfiguration();
+ // GC sweep tests assume retention has already elapsed.
+ cfg.setTransactionCoordinatorScalableTopicsGcRetentionSeconds(0);
+ when(pulsar.getConfiguration()).thenReturn(cfg);
+ brokerService = mock(BrokerService.class);
when(pulsar.getBrokerService()).thenReturn(brokerService);
// Default: owned. Tests that want to assert the not-owned path can
override.
when(brokerService.checkTopicNsOwnership(any())).thenReturn(CompletableFuture.completedFuture(null));
@@ -75,6 +82,9 @@ public class TransactionCoordinatorV5Test {
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
+ if (tc != null) {
+ tc.close();
+ }
if (store != null) {
store.close();
}
@@ -240,4 +250,86 @@ public class TransactionCoordinatorV5Test {
assertThat(a.getMostSigBits()).isEqualTo(1L);
assertThat(b.getMostSigBits()).isEqualTo(2L);
}
+
+ @Test
+ public void sweepTimeouts_abortsExpiredOpenTxnAndFansOut() throws
Exception {
+ // 1ms timeout → deadline already in the past when the sweep runs.
+ TxnID txnId = tc.newTransaction(TC_ID, 1L, "owner").get();
+ String txnIdKey = TxnIds.toKey(txnId);
+ String segment = "segment://public/default/topic/0000-ffff-0";
+ txnStore.appendOp(txnIdKey,
+ new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+ List<String> received = new ArrayList<>();
+ try (var sub = txnStore.subscribeSegmentEvents(segment,
received::add)) {
+ tc.sweepTimeouts().get();
+ var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+ // Fan-out fires for the participant.
+ Awaitility.await().untilAsserted(() ->
assertThat(received).isNotEmpty());
+ }
+ }
+
+ @Test
+ public void sweepTimeouts_leavesUnexpiredOpenTxnAlone() throws Exception {
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ tc.sweepTimeouts().get();
+ var header =
txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+ }
+
+ @Test
+ public void sweepGc_deletesHeaderWhenNoOpsRemain() throws Exception {
+ // No participants → fan-out wrote no events → no /txn/op records to
clean up → GC may
+ // delete the header straight away.
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ String txnIdKey = TxnIds.toKey(txnId);
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+
+ tc.sweepGc().get();
+
+ assertThat(txnStore.getHeader(txnIdKey).get()).isEmpty();
+ }
+
+ @Test
+ public void sweepGc_repairsAndRetainsHeaderWhenOpsRemain() throws
Exception {
+ // A finalized txn with a leftover /txn/op record — the participant
either hasn't applied
+ // the outcome yet, or never received the event (TC crashed between
header CAS and publish).
+ // GC must re-drive the fan-out so the participant re-reads the true
outcome, and must NOT
+ // delete the header while it could still be re-read (else a COMMITTED
txn's data would
+ // default to ABORTED).
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ String txnIdKey = TxnIds.toKey(txnId);
+ String segment = "segment://public/default/topic/0000-ffff-0";
+ txnStore.appendOp(txnIdKey,
+ new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+ List<String> received = new ArrayList<>();
+ try (var sub = txnStore.subscribeSegmentEvents(segment,
received::add)) {
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+ Awaitility.await().untilAsserted(() ->
assertThat(received).hasSize(1));
+
+ tc.sweepGc().get();
+
+ // Header retained — participant may still need to re-read.
+ TxnHeader header =
txnStore.getHeader(txnIdKey).get().orElseThrow().value();
+ assertThat(header.getState()).isEqualTo(TxnState.COMMITTED);
+ // Repair re-published the event.
+ Awaitility.await().untilAsserted(() ->
assertThat(received).hasSize(2));
+ }
+ }
+
+ @Test
+ public void sweeps_skipWhenNotElected() throws Exception {
+ // Override the owned-default with a failure → not the elected sweeper
→ action skipped.
+ when(brokerService.checkTopicNsOwnership(any())).thenReturn(
+ CompletableFuture.failedFuture(new RuntimeException("not
owner")));
+
+ TxnID expired = tc.newTransaction(TC_ID, 1L, "owner").get();
+ tc.sweepTimeouts().get();
+
+ // Still OPEN — the sweep never ran because we don't own
assign-partition 0.
+ var header =
txnStore.getHeader(TxnIds.toKey(expired)).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+ }
}