This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5f347ee773c [improve][broker]Reduce the lock range of SimpleCache to 
enhance performance (#25293)
5f347ee773c is described below

commit 5f347ee773c681dd418b7a280a640271494a4234
Author: fengyubiao <[email protected]>
AuthorDate: Thu Mar 26 14:52:04 2026 +0800

    [improve][broker]Reduce the lock range of SimpleCache to enhance 
performance (#25293)
    
    (cherry picked from commit 9bbea3e2f2380fc6520af52c893a6f04abf3bd1d)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 ++++---
 .../SystemTopicTxnBufferSnapshotService.java       | 28 ++++++++++++++++------
 .../SingleSnapshotAbortedTxnProcessorImpl.java     |  8 +++++--
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    | 15 +++++++++---
 .../TopicTransactionBufferRecoverTest.java         |  8 ++++++-
 .../pulsar/broker/transaction/TransactionTest.java |  4 +++-
 6 files changed, 54 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d4b69f8d5fb..bf5dc8a04f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -302,7 +302,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     private TransactionPendingAckStoreProvider 
transactionPendingAckStoreProvider;
     private final ExecutorProvider transactionExecutorProvider;
-    private final ExecutorProvider transactionSnapshotRecoverExecutorProvider;
+    private final OrderedScheduler transactionSnapshotRecoverExecutorProvider;
     private final MonotonicClock monotonicClock;
     private String brokerId;
     private final CompletableFuture<Void> readyForIncomingRequestsFuture = new 
CompletableFuture<>();
@@ -373,8 +373,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         if (config.isTransactionCoordinatorEnabled()) {
             this.transactionExecutorProvider = new 
ExecutorProvider(this.getConfiguration()
                     .getNumTransactionReplayThreadPoolSize(), 
"pulsar-transaction-executor");
-            this.transactionSnapshotRecoverExecutorProvider = new 
ExecutorProvider(this.getConfiguration()
-                    .getNumTransactionReplayThreadPoolSize(), 
"pulsar-transaction-snapshot-recover");
+            this.transactionSnapshotRecoverExecutorProvider = 
OrderedScheduler.newSchedulerBuilder()
+                    
.numThreads(this.getConfiguration().getNumTransactionReplayThreadPoolSize())
+                    .name("pulsar-transaction-snapshot-recover")
+                    .build();
         } else {
             this.transactionExecutorProvider = null;
             this.transactionSnapshotRecoverExecutorProvider = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index ba6cbee3557..3e893e8f26a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -41,13 +41,19 @@ public class SystemTopicTxnBufferSnapshotService<T> {
 
     protected final ConcurrentHashMap<NamespaceName, SystemTopicClient<T>> 
clients;
     protected final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
+    protected final PulsarClientImpl pulsarClient;
 
     protected final Class<T> schemaType;
     protected final EventType systemTopicType;
 
     private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> 
refCountedWriterMap;
-    @Getter
-    private final TableView<T> tableView;
+
+    /** SystemTopicTxnBufferSnapshotService is created only three, see also
+     *  {@link TransactionBufferSnapshotServiceFactory}. At the same time, 
each object can only
+     * be fixed threads access, see also {@link 
PulsarService#transactionSnapshotRecoverExecutorProvider}.
+     * So the un-static ThreadLocal is safe.
+     */
+    private final ThreadLocal<TableView<T>> tableViewThreadLocal = new 
ThreadLocal<>();
 
     // The class ReferenceCountedWriter will maintain the reference count,
     // when the reference count decrement to 0, it will be removed from 
writerFutureMap, the writer will be closed.
@@ -103,14 +109,12 @@ public class SystemTopicTxnBufferSnapshotService<T> {
 
     public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType 
systemTopicType,
                                                Class<T> schemaType) throws 
PulsarServerException {
-        final var client = (PulsarClientImpl) pulsar.getClient();
-        this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(client);
+        this.pulsarClient = (PulsarClientImpl) pulsar.getClient();
+        this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(pulsarClient);
         this.systemTopicType = systemTopicType;
         this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
         this.refCountedWriterMap = new ConcurrentHashMap<>();
-        this.tableView = new TableView<>(this::createReader,
-                client.getConfiguration().getOperationTimeoutMs(), 
pulsar.getExecutor());
     }
 
     public CompletableFuture<SystemTopicClient.Reader<T>> 
createReader(TopicName topicName) {
@@ -173,4 +177,14 @@ public class SystemTopicTxnBufferSnapshotService<T> {
         refCountedWriterMap.clear();
     }
 
+    public TableView<T> getTableView(ScheduledExecutorService 
scheduledExecutor) {
+        TableView<T> tableView = tableViewThreadLocal.get();
+        if (tableView == null) {
+            tableView = new TableView<>(this::createReader,
+                    pulsarClient.getConfiguration().getOperationTimeoutMs(), 
scheduledExecutor);
+            tableViewThreadLocal.set(tableView);
+        }
+        return tableView;
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index c737da2ed0e..5d360a5822d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
@@ -91,10 +92,13 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
     public CompletableFuture<Position> recoverFromSnapshot() {
         final var future = new CompletableFuture<Position>();
         final var pulsar = topic.getBrokerService().getPulsar();
-        
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
 -> {
+        final String namespace = TopicName.get(topic.getName()).getNamespace();
+        final ScheduledExecutorService scheduledExecutor = 
pulsar.getTransactionSnapshotRecoverExecutorProvider()
+                .chooseThread(namespace);
+        scheduledExecutor.execute(() -> {
             try {
                 final var snapshot = 
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
-                        .getTableView().readLatest(topic.getName());
+                        
.getTableView(scheduledExecutor).readLatest(topic.getName());
                 if (snapshot != null) {
                     handleSnapshot(snapshot);
                     final var startReadCursorPosition = 
PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 98d8a40eb36..8fa917ab7e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -231,10 +232,14 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
     public CompletableFuture<Position> recoverFromSnapshot() {
         final var pulsar = topic.getBrokerService().getPulsar();
         final var future = new CompletableFuture<Position>();
-        
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
 -> {
+        final String namespace = TopicName.get(topic.getName()).getNamespace();
+        final ScheduledExecutorService scheduledExecutor = 
pulsar.getTransactionSnapshotRecoverExecutorProvider()
+                .chooseThread(namespace);
+        scheduledExecutor.execute(() -> {
             try {
                 final var indexes = 
pulsar.getTransactionBufferSnapshotServiceFactory()
-                        
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+                        
.getTxnBufferSnapshotIndexService().getTableView(scheduledExecutor)
+                        .readLatest(topic.getName());
                 if (indexes == null) {
                     // Try recovering from the old format snapshot
                     future.complete(recoverOldSnapshot());
@@ -342,6 +347,10 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
 
     // This method will be deprecated and removed in version 4.x.0
     private Position recoverOldSnapshot() throws Exception {
+        final String namespace = TopicName.get(topic.getName()).getNamespace();
+        final ScheduledExecutorService scheduledExecutor = 
topic.getBrokerService().getPulsar()
+                .getTransactionSnapshotRecoverExecutorProvider()
+                .chooseThread(namespace);
         final var pulsar = topic.getBrokerService().getPulsar();
         final var topicName = TopicName.get(topic.getName());
         final var topics = 
wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
@@ -351,7 +360,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
             return null;
         }
         final var snapshot = 
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
-                .getTableView().readLatest(topic.getName());
+                .getTableView(scheduledExecutor).readLatest(topic.getName());
         if (snapshot == null) {
             return null;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 149d73aede6..97161122c81 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -592,6 +593,10 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
 
     @Test(timeOut = 30000)
     public void testTransactionBufferRecoverThrowException() throws Exception {
+        OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(1)
+                .name("pulsar-transaction-snapshot-recover")
+                .build();
         String topic = NAMESPACE1 + 
"/testTransactionBufferRecoverThrowPulsarClientException";
         @Cleanup
         Producer<byte[]> producer = pulsarClient
@@ -620,7 +625,8 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         doReturn(CompletableFuture.completedFuture(reader))
                 .when(systemTopicTxnBufferSnapshotService).createReader(any());
         
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
-        doReturn(new 
MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
+        doReturn(new 
MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService)
+                .getTableView(scheduler);
         TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory =
                 mock(TransactionBufferSnapshotServiceFactory.class);
         doReturn(systemTopicTxnBufferSnapshotService)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 6f1b5627586..ee292750263 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1117,9 +1117,11 @@ public class TransactionTest extends TransactionTestBase 
{
 
     @Test
     public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() 
throws Exception {
+        final String topic = NAMESPACE1 + "/changeMaxReadPositionCount" + 
UUID.randomUUID();
+        pulsarClient.newProducer().topic(topic).create().close();
         PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
                 .getBrokerService()
-                .getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + 
UUID.randomUUID(), true)
+                .getTopic(topic, true)
                 .get().get();
         TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
         Field processorField = 
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");

Reply via email to