This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 94de2b6806b [improve][txn] PIP-473: GC aborted-transaction records on
ML trim and segment drop (#25975)
94de2b6806b is described below
commit 94de2b6806bbc39bddf9c32bde6755ee3a206716
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jun 9 09:35:28 2026 -0700
[improve][txn] PIP-473: GC aborted-transaction records on ML trim and
segment drop (#25975)
---
.../service/scalable/ScalableTopicService.java | 24 +++-
.../buffer/impl/MetadataTransactionBuffer.java | 124 +++++++++++++++++++++
.../transaction/metadata/TxnMetadataStore.java | 40 +++++++
.../service/scalable/ScalableTopicServiceTest.java | 2 +
.../buffer/impl/MetadataTransactionBufferTest.java | 30 +++++
.../transaction/metadata/TxnMetadataStoreTest.java | 35 ++++++
6 files changed, 253 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index 56b049181ca..a6c6befd6f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -232,6 +233,13 @@ public class ScalableTopicService {
* Delete a scalable topic and all its segment topics.
*/
public CompletableFuture<Void> deleteScalableTopic(TopicName topic) {
+ // When transactions are enabled, the segments carry durable
/txn/segment-state records
+ // (watermark + aborted-txn records). Delete them alongside the
segment topics so they don't
+ // outlive the data.
+ TxnMetadataStore txnStore =
+
brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
+ ? new
TxnMetadataStore(brokerService.getPulsar().getLocalMetadataStore())
+ : null;
return releaseController(topic)
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topic))
.thenCompose(optMd -> {
@@ -239,16 +247,28 @@ public class ScalableTopicService {
return CompletableFuture.completedFuture(null);
}
ScalableTopicMetadata metadata = optMd.get();
- // Delete all underlying segment topics
+ // Delete all underlying segment topics, then their
durable transaction state.
return FutureUtil.waitForAll(
metadata.getSegments().values().stream()
- .map(segment ->
deleteUnderlyingSegmentTopic(topic, segment))
+ .map(segment ->
deleteUnderlyingSegmentTopic(topic, segment)
+ .thenCompose(__ ->
cleanupSegmentTxnState(txnStore, topic, segment)))
.toList()
);
})
.thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
}
+ /** Delete the durable {@code /txn/segment-state} records for a segment
being dropped. */
+ private CompletableFuture<Void> cleanupSegmentTxnState(TxnMetadataStore
txnStore,
+ TopicName
parentTopic, SegmentInfo segment) {
+ if (txnStore == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ String segmentName = SegmentTopicName.fromParent(
+ parentTopic, segment.hashRange(),
segment.segmentId()).toString();
+ return txnStore.deleteAllSegmentState(segmentName);
+ }
+
/**
* Register a scalable consumer with the controller leader for {@code
topic}.
* Persists a durable session and returns the consumer's segment
assignment.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 099bea5f211..afbdd1b8b3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,6 +30,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -42,6 +47,7 @@ import
org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.broker.transaction.metadata.AbortedTxnRecord;
import org.apache.pulsar.broker.transaction.metadata.SegmentWatermark;
import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
@@ -55,6 +61,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.ScanConsumer;
@@ -132,6 +139,11 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
private final LongAdder committedCount = new LongAdder();
private final LongAdder abortedCount = new LongAdder();
+ /** Periodic task that range-deletes aborted-txn records once the segment
ML trims past them. */
+ private final ScheduledFuture<?> abortedGcTask;
+ /** Guards against a new GC cycle starting while the previous async one is
still in flight. */
+ private final AtomicBoolean gcRunning = new AtomicBoolean(false);
+
public MetadataTransactionBuffer(PersistentTopic topic, TxnMetadataStore
txnStore) {
this.topic = topic;
this.ledger = topic.getManagedLedger();
@@ -140,6 +152,55 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.maxReadPosition = ledger.getLastConfirmedEntry();
recover();
+ this.abortedGcTask = scheduleAbortedGc();
+ }
+
+ /**
+ * Schedule the periodic aborted-record GC on the broker executor. Returns
{@code null} when no
+ * executor is reachable (e.g. a unit test with a mocked topic); such
callers drive
+ * {@link #pruneTrimmedAbortedTxns()} directly.
+ */
+ private ScheduledFuture<?> scheduleAbortedGc() {
+ ScheduledExecutorService executor = brokerExecutor();
+ if (executor == null) {
+ return null;
+ }
+ long intervalSeconds = Math.max(1,
topic.getBrokerService().getPulsar().getConfiguration()
+ .getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+ long intervalMs = TimeUnit.SECONDS.toMillis(intervalSeconds);
+ // Wrap in catchingAndLoggingThrowables so an unexpected
RuntimeException doesn't cancel the
+ // fixed-delay schedule. The gcRunning guard skips a cycle while the
previous async sweep is
+ // still in flight (slow metadata store) rather than overlapping
sweeps.
+ return
executor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(() -> {
+ if (closed || !gcRunning.compareAndSet(false, true)) {
+ return;
+ }
+ CompletableFuture<Void> sweep;
+ try {
+ sweep = pruneTrimmedAbortedTxns();
+ } catch (Throwable t) {
+ gcRunning.set(false);
+ throw t;
+ }
+ sweep.whenComplete((__, ex) -> {
+ gcRunning.set(false);
+ if (ex != null) {
+ log.warn().attr("segment", segmentName).exception(ex)
+ .log("Aborted-txn GC sweep failed; will retry next
cycle");
+ }
+ });
+ }), intervalMs, intervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ private ScheduledExecutorService brokerExecutor() {
+ try {
+ if (topic.getBrokerService() != null &&
topic.getBrokerService().getPulsar() != null) {
+ return topic.getBrokerService().getPulsar().getExecutor();
+ }
+ } catch (Throwable t) {
+ // Mocked topic in unit tests — no broker executor; GC is driven
directly.
+ }
+ return null;
}
// ---- Recovery ----------------------------------------------------------
@@ -625,10 +686,73 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
@Override
public CompletableFuture<Void> closeAsync() {
closed = true;
+ if (abortedGcTask != null) {
+ abortedGcTask.cancel(false);
+ }
closeSubscriptionQuietly();
return CompletableFuture.completedFuture(null);
}
+ /**
+ * Range-delete aborted-txn records — and drop their in-memory {@link
#abortedTxns} entries —
+ * whose highest position in this segment is below the ML's first
still-valid position, i.e. whose
+ * data has been fully trimmed. Safe because a trimmed position is never
dispatched, so its abort
+ * filtering is no longer needed; records for still-readable data (max
position at or above the
+ * first valid position) are retained. Without this the durable aborted
set and the heap set grow
+ * for the segment's whole lifetime even as the underlying data is trimmed
away.
+ */
+ @VisibleForTesting
+ CompletableFuture<Void> pruneTrimmedAbortedTxns() {
+ if (closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ Position firstValid = ledger.getFirstPosition();
+ if (firstValid == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<String> toPrune = Collections.synchronizedList(new ArrayList<>());
+ return txnStore.scanAbortedTxns(segmentName,
+ TxnPaths.abortedByPositionSegmentLowerBound(segmentName),
+ TxnPaths.abortedByPositionSegmentUpperBound(segmentName),
+ new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ String txnIdKey =
TxnPaths.txnIdFromAbortedPath(r.getStat().getPath());
+ if (txnIdKey == null) {
+ return;
+ }
+ AbortedTxnRecord rec =
TxnMetadataStore.fromJson(r.getValue(), AbortedTxnRecord.class);
+ Position maxPos =
PositionFactory.create(rec.maxLedgerId(), rec.maxEntryId());
+ // Strictly below the first valid position → fully
trimmed (conservative).
+ if (maxPos.compareTo(firstValid) < 0) {
+ toPrune.add(txnIdKey);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("segment",
segmentName).exception(throwable)
+ .log("Aborted-txn GC scan errored");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ if (toPrune.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ synchronized (lock) {
+ toPrune.forEach(abortedTxns::remove);
+ }
+ List<CompletableFuture<Void>> deletes = new
ArrayList<>(toPrune.size());
+ for (String txnIdKey : toPrune) {
+ deletes.add(txnStore.deleteAbortedTxn(segmentName,
txnIdKey));
+ }
+ return FutureUtil.waitForAll(deletes);
+ });
+ }
+
private void closeSubscriptionQuietly() {
AutoCloseable handle = subscription;
if (handle == null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index ebf300b7725..ec7c25b389b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.transaction.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -29,6 +31,7 @@ import java.util.function.Consumer;
import lombok.CustomLog;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Option;
@@ -398,6 +401,43 @@ public class TxnMetadataStore {
return store.deleteIfExists(TxnPaths.segmentWatermarkPath(segment),
Optional.empty(), opts);
}
+ /**
+ * Delete all durable per-segment transaction state — every aborted-txn
record and the watermark —
+ * when a segment is dropped (e.g. the scalable topic is deleted), so the
{@code /txn/segment-state}
+ * records don't outlive the segment's data. Idempotent: missing records
are no-ops.
+ */
+ public CompletableFuture<Void> deleteAllSegmentState(String segment) {
+ List<String> abortedKeys = Collections.synchronizedList(new
ArrayList<>());
+ return scanAbortedTxns(segment,
+ TxnPaths.abortedByPositionSegmentLowerBound(segment),
+ TxnPaths.abortedByPositionSegmentUpperBound(segment),
+ new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ String txnIdKey =
TxnPaths.txnIdFromAbortedPath(r.getStat().getPath());
+ if (txnIdKey != null) {
+ abortedKeys.add(txnIdKey);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("segment",
segment).exception(throwable)
+ .log("Segment-state cleanup scan errored");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ List<CompletableFuture<Void>> deletes = new
ArrayList<>(abortedKeys.size());
+ for (String txnIdKey : abortedKeys) {
+ deletes.add(deleteAbortedTxn(segment, txnIdKey));
+ }
+ return FutureUtil.waitForAll(deletes);
+ }).thenCompose(__ -> deleteSegmentWatermark(segment));
+ }
+
// ---- TC sequence counter ----------------------------------------------
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
index 725016f6e67..47271c2dc02 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.resources.SubscriptionType;
@@ -94,6 +95,7 @@ public class ScalableTopicServiceTest {
scalableTopicsAdmin = mock(ScalableTopics.class);
when(brokerService.getPulsar()).thenReturn(pulsar);
+ when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration());
when(brokerService.getTopicIfExists(anyString()))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(pulsar.getBrokerId()).thenReturn(BROKER_ID);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index ec3476916a9..a7f3890111b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -279,6 +279,36 @@ public class MetadataTransactionBufferTest {
assertThat(tb.isTxnAborted(oldAbortedTxn, PositionFactory.create(3,
5))).isTrue();
}
+ @Test
+ public void pruneTrimmedAborted_dropsBelowFirstValid_retainsAbove() throws
Exception {
+ // An aborted txn whose data the ML has fully trimmed (max position
below the first valid
+ // position) is dropped from both the durable aborted records and the
in-memory set; an
+ // aborted txn whose data is still readable is retained.
+ TxnID trimmedTxn = new TxnID(1, 100); // max position on ledger 1 —
will be trimmed away
+ TxnID liveTxn = new TxnID(1, 200); // max position on ledger 10 —
still readable
+ txnStore.putAbortedTxn(SEGMENT, TxnIds.toKey(trimmedTxn), 1L,
5L).get();
+ txnStore.putAbortedTxn(SEGMENT, TxnIds.toKey(liveTxn), 10L, 5L).get();
+
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+ assertThat(tb.isTxnAborted(trimmedTxn, PositionFactory.create(1,
5))).isTrue();
+ assertThat(tb.isTxnAborted(liveTxn, PositionFactory.create(10,
5))).isTrue();
+
+ // The ML has trimmed everything below ledger 5.
+ when(ledger.getFirstPosition()).thenReturn(PositionFactory.create(5,
0));
+ tb.pruneTrimmedAbortedTxns().get();
+
+ // In-memory: trimmed dropped, live retained.
+ assertThat(tb.isTxnAborted(trimmedTxn, PositionFactory.create(1,
5))).isFalse();
+ assertThat(tb.isTxnAborted(liveTxn, PositionFactory.create(10,
5))).isTrue();
+
+ // Durable record also deleted: a fresh TB recovers only the live txn.
+ MetadataTransactionBuffer tb2 = new MetadataTransactionBuffer(topic,
txnStore);
+ tb2.checkIfTBRecoverCompletely().get();
+ assertThat(tb2.isTxnAborted(trimmedTxn, PositionFactory.create(1,
5))).isFalse();
+ assertThat(tb2.isTxnAborted(liveTxn, PositionFactory.create(10,
5))).isTrue();
+ }
+
@Test
public void recoveryDiscoveredOpenTxn_pinsAtWatermark() throws Exception {
// /txn/op records exist for an open txn (broker was processing
publishes for txn T;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
index 40c04ec0a8f..19d22f3f46e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -24,6 +24,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.GetResult;
@@ -219,6 +220,40 @@ public class TxnMetadataStoreTest {
assertThat(received).isNotEmpty().last().asString().isEqualTo(s.getPath()));
}
+ @Test
+ public void deleteAllSegmentState_removesAbortedRecordsAndWatermark()
throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+ String segment = "segment://public/default/topic/0000-ffff-0";
+
+ txn.putAbortedTxn(segment, "t1", 1L, 5L).get();
+ txn.putAbortedTxn(segment, "t2", 2L, 7L).get();
+ txn.casSegmentWatermark(segment, new SegmentWatermark(3, 0),
Optional.empty()).get();
+
+ txn.deleteAllSegmentState(segment).get();
+
+ List<String> remaining = new ArrayList<>();
+ txn.scanAbortedTxns(segment,
+ TxnPaths.abortedByPositionSegmentLowerBound(segment),
+ TxnPaths.abortedByPositionSegmentUpperBound(segment),
+ new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ remaining.add(r.getStat().getPath());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).get();
+ assertThat(remaining).isEmpty();
+ assertThat(txn.getSegmentWatermark(segment).get()).isEmpty();
+ }
+
// ---- helpers -----------------------------------------------------------
private static ScanConsumer collectHeaders(List<TxnHeader> out) {