This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 301d6559563 [fix][txn] PIP-473: prune terminal transactions from the
metadata buffer cache (#25960)
301d6559563 is described below
commit 301d65595633ab625bea788cb16bf2bb51ec499c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 8 09:13:41 2026 -0700
[fix][txn] PIP-473: prune terminal transactions from the metadata buffer
cache (#25960)
---
.../buffer/impl/MetadataTransactionBuffer.java | 15 ++++++++
.../buffer/impl/MetadataTransactionBufferTest.java | 42 ++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 4ac6b89ae51..099bea5f211 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
@@ -434,6 +435,12 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
// it (e.g. after an in-memory rebuild that lost the set).
abortedTxns.add(txnIdKey);
}
+ // Drop the now-terminal entry so the cache stays bounded by the
open-txn count rather
+ // than growing for the segment's lifetime. This is safe:
recomputeMaxReadPositionLocked
+ // only consults OPEN entries, and isTxnAborted reads the separate
abortedTxns set (an
+ // aborted txn stays there, a committed/unknown one correctly
reads as visible). The
+ // positions needed for the durable side-effects below were
already captured above.
+ txns.remove(txnIdKey);
}
// Persist aborted record if this is an abort.
@@ -704,6 +711,14 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
}
}
+ /** Size of the in-memory per-txn cache; bounded by the open-txn count
once terminals are pruned. */
+ @VisibleForTesting
+ int trackedTxnCount() {
+ synchronized (lock) {
+ return txns.size();
+ }
+ }
+
@Override
public long getAbortedTxnCount() {
return abortedCount.sum();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 8234eea3832..ec3476916a9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -157,6 +157,48 @@ public class MetadataTransactionBufferTest {
});
}
+ @Test
+ public void terminalTxns_prunedFromCache_visibilityUnchanged() throws
Exception {
+ // Resolve many transactions (mixed commit/abort) and confirm the
in-memory per-txn cache is
+ // pruned back to empty rather than growing for the segment's
lifetime, while visibility stays
+ // correct: aborted txns remain filtered (via the durable aborted set)
and committed/unknown
+ // txns remain visible.
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ int n = 20;
+ TxnID lastAborted = null;
+ Position lastAbortedPos = null;
+ TxnID lastCommitted = null;
+ for (int i = 1; i <= n; i++) {
+ TxnID txnId = new TxnID(1, i);
+ createOpenHeader(txnId);
+ Position p = tb.appendBufferToTxn(txnId, 0, payload("v" +
i)).get();
+ if (i % 2 == 0) {
+ commitTxn(txnId);
+ txnStore.publishSegmentEvent(SEGMENT, new
TxnEvent(TxnIds.toKey(txnId), TxnState.COMMITTED)).get();
+ lastCommitted = txnId;
+ } else {
+ abortTxn(txnId);
+ txnStore.publishSegmentEvent(SEGMENT, new
TxnEvent(TxnIds.toKey(txnId), TxnState.ABORTED)).get();
+ lastAborted = txnId;
+ lastAbortedPos = p;
+ }
+ }
+
+ // Once every txn is terminal, the cache holds nothing (no OPEN txns
remain).
+ Awaitility.await().untilAsserted(() -> {
+ assertThat(tb.getOngoingTxnCount()).isZero();
+ assertThat(tb.trackedTxnCount()).isZero();
+ });
+
+ // Visibility correctness survives pruning.
+ assertThat(tb.isTxnAborted(lastAborted, lastAbortedPos)).isTrue();
+ assertThat(tb.isTxnAborted(lastCommitted, PositionFactory.create(1,
0))).isFalse();
+ assertThat(tb.getCommittedTxnCount()).isEqualTo(n / 2);
+ assertThat(tb.getAbortedTxnCount()).isEqualTo(n / 2);
+ }
+
@Test
public void appendToCommittedTxn_failsTxnConflict() throws Exception {
TxnID txnId = new TxnID(1, 1);