This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5f347ee773c [improve][broker]Reduce the lock range of SimpleCache to
enhance performance (#25293)
5f347ee773c is described below
commit 5f347ee773c681dd418b7a280a640271494a4234
Author: fengyubiao <[email protected]>
AuthorDate: Thu Mar 26 14:52:04 2026 +0800
[improve][broker]Reduce the lock range of SimpleCache to enhance
performance (#25293)
(cherry picked from commit 9bbea3e2f2380fc6520af52c893a6f04abf3bd1d)
---
.../org/apache/pulsar/broker/PulsarService.java | 8 ++++---
.../SystemTopicTxnBufferSnapshotService.java | 28 ++++++++++++++++------
.../SingleSnapshotAbortedTxnProcessorImpl.java | 8 +++++--
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 15 +++++++++---
.../TopicTransactionBufferRecoverTest.java | 8 ++++++-
.../pulsar/broker/transaction/TransactionTest.java | 4 +++-
6 files changed, 54 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d4b69f8d5fb..bf5dc8a04f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -302,7 +302,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private TransactionPendingAckStoreProvider
transactionPendingAckStoreProvider;
private final ExecutorProvider transactionExecutorProvider;
- private final ExecutorProvider transactionSnapshotRecoverExecutorProvider;
+ private final OrderedScheduler transactionSnapshotRecoverExecutorProvider;
private final MonotonicClock monotonicClock;
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new
CompletableFuture<>();
@@ -373,8 +373,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (config.isTransactionCoordinatorEnabled()) {
this.transactionExecutorProvider = new
ExecutorProvider(this.getConfiguration()
.getNumTransactionReplayThreadPoolSize(),
"pulsar-transaction-executor");
- this.transactionSnapshotRecoverExecutorProvider = new
ExecutorProvider(this.getConfiguration()
- .getNumTransactionReplayThreadPoolSize(),
"pulsar-transaction-snapshot-recover");
+ this.transactionSnapshotRecoverExecutorProvider =
OrderedScheduler.newSchedulerBuilder()
+
.numThreads(this.getConfiguration().getNumTransactionReplayThreadPoolSize())
+ .name("pulsar-transaction-snapshot-recover")
+ .build();
} else {
this.transactionExecutorProvider = null;
this.transactionSnapshotRecoverExecutorProvider = null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index ba6cbee3557..3e893e8f26a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -41,13 +41,19 @@ public class SystemTopicTxnBufferSnapshotService<T> {
protected final ConcurrentHashMap<NamespaceName, SystemTopicClient<T>>
clients;
protected final NamespaceEventsSystemTopicFactory
namespaceEventsSystemTopicFactory;
+ protected final PulsarClientImpl pulsarClient;
protected final Class<T> schemaType;
protected final EventType systemTopicType;
private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>>
refCountedWriterMap;
- @Getter
- private final TableView<T> tableView;
+
+ /** SystemTopicTxnBufferSnapshotService is created only three, see also
+ * {@link TransactionBufferSnapshotServiceFactory}. At the same time,
each object can only
+ * be fixed threads access, see also {@link
PulsarService#transactionSnapshotRecoverExecutorProvider}.
+ * So the un-static ThreadLocal is safe.
+ */
+ private final ThreadLocal<TableView<T>> tableViewThreadLocal = new
ThreadLocal<>();
// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from
writerFutureMap, the writer will be closed.
@@ -103,14 +109,12 @@ public class SystemTopicTxnBufferSnapshotService<T> {
public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType
systemTopicType,
Class<T> schemaType) throws
PulsarServerException {
- final var client = (PulsarClientImpl) pulsar.getClient();
- this.namespaceEventsSystemTopicFactory = new
NamespaceEventsSystemTopicFactory(client);
+ this.pulsarClient = (PulsarClientImpl) pulsar.getClient();
+ this.namespaceEventsSystemTopicFactory = new
NamespaceEventsSystemTopicFactory(pulsarClient);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
- this.tableView = new TableView<>(this::createReader,
- client.getConfiguration().getOperationTimeoutMs(),
pulsar.getExecutor());
}
public CompletableFuture<SystemTopicClient.Reader<T>>
createReader(TopicName topicName) {
@@ -173,4 +177,14 @@ public class SystemTopicTxnBufferSnapshotService<T> {
refCountedWriterMap.clear();
}
+ public TableView<T> getTableView(ScheduledExecutorService
scheduledExecutor) {
+ TableView<T> tableView = tableViewThreadLocal.get();
+ if (tableView == null) {
+ tableView = new TableView<>(this::createReader,
+ pulsarClient.getConfiguration().getOperationTimeoutMs(),
scheduledExecutor);
+ tableViewThreadLocal.set(tableView);
+ }
+ return tableView;
+ }
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index c737da2ed0e..5d360a5822d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
@@ -91,10 +92,13 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
public CompletableFuture<Position> recoverFromSnapshot() {
final var future = new CompletableFuture<Position>();
final var pulsar = topic.getBrokerService().getPulsar();
-
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
-> {
+ final String namespace = TopicName.get(topic.getName()).getNamespace();
+ final ScheduledExecutorService scheduledExecutor =
pulsar.getTransactionSnapshotRecoverExecutorProvider()
+ .chooseThread(namespace);
+ scheduledExecutor.execute(() -> {
try {
final var snapshot =
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
- .getTableView().readLatest(topic.getName());
+
.getTableView(scheduledExecutor).readLatest(topic.getName());
if (snapshot != null) {
handleSnapshot(snapshot);
final var startReadCursorPosition =
PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 98d8a40eb36..8fa917ab7e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -231,10 +232,14 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
public CompletableFuture<Position> recoverFromSnapshot() {
final var pulsar = topic.getBrokerService().getPulsar();
final var future = new CompletableFuture<Position>();
-
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
-> {
+ final String namespace = TopicName.get(topic.getName()).getNamespace();
+ final ScheduledExecutorService scheduledExecutor =
pulsar.getTransactionSnapshotRecoverExecutorProvider()
+ .chooseThread(namespace);
+ scheduledExecutor.execute(() -> {
try {
final var indexes =
pulsar.getTransactionBufferSnapshotServiceFactory()
-
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+
.getTxnBufferSnapshotIndexService().getTableView(scheduledExecutor)
+ .readLatest(topic.getName());
if (indexes == null) {
// Try recovering from the old format snapshot
future.complete(recoverOldSnapshot());
@@ -342,6 +347,10 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
// This method will be deprecated and removed in version 4.x.0
private Position recoverOldSnapshot() throws Exception {
+ final String namespace = TopicName.get(topic.getName()).getNamespace();
+ final ScheduledExecutorService scheduledExecutor =
topic.getBrokerService().getPulsar()
+ .getTransactionSnapshotRecoverExecutorProvider()
+ .chooseThread(namespace);
final var pulsar = topic.getBrokerService().getPulsar();
final var topicName = TopicName.get(topic.getName());
final var topics =
wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
@@ -351,7 +360,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
return null;
}
final var snapshot =
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
- .getTableView().readLatest(topic.getName());
+ .getTableView(scheduledExecutor).readLatest(topic.getName());
if (snapshot == null) {
return null;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 149d73aede6..97161122c81 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -592,6 +593,10 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
@Test(timeOut = 30000)
public void testTransactionBufferRecoverThrowException() throws Exception {
+ OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(1)
+ .name("pulsar-transaction-snapshot-recover")
+ .build();
String topic = NAMESPACE1 +
"/testTransactionBufferRecoverThrowPulsarClientException";
@Cleanup
Producer<byte[]> producer = pulsarClient
@@ -620,7 +625,8 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
doReturn(CompletableFuture.completedFuture(reader))
.when(systemTopicTxnBufferSnapshotService).createReader(any());
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
- doReturn(new
MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
+ doReturn(new
MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService)
+ .getTableView(scheduler);
TransactionBufferSnapshotServiceFactory
transactionBufferSnapshotServiceFactory =
mock(TransactionBufferSnapshotServiceFactory.class);
doReturn(systemTopicTxnBufferSnapshotService)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 6f1b5627586..ee292750263 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1117,9 +1117,11 @@ public class TransactionTest extends TransactionTestBase
{
@Test
public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot()
throws Exception {
+ final String topic = NAMESPACE1 + "/changeMaxReadPositionCount" +
UUID.randomUUID();
+ pulsarClient.newProducer().topic(topic).create().close();
PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
.getBrokerService()
- .getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" +
UUID.randomUUID(), true)
+ .getTopic(topic, true)
.get().get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Field processorField =
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");