This is an automated email from the ASF dual-hosted git repository.

liangyepianzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c4e83d2374 [improve][common] Optimize TopicName.get() to reduce lock 
contention on cache lookup (#25367)
8c4e83d2374 is described below

commit 8c4e83d2374a6431953684bfadbf5abc301dd26f
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Apr 21 16:56:01 2026 +0800

    [improve][common] Optimize TopicName.get() to reduce lock contention on 
cache lookup (#25367)
    
    ### Motivation
    
    `TopicName.get()` previously used `ConcurrentHashMap.computeIfAbsent()` to 
populate the topic-name cache. Although `computeIfAbsent` is atomic, it holds 
the internal bin-lock for the entire duration of the mapping function, which 
includes the non-trivial `TopicName` construction (string splitting, 
validation, etc.).
    
    Under high-concurrency workloads where many threads simultaneously 
encounter the same uncached topic name, this causes unnecessary lock contention 
and can degrade throughput.
    
    ### Modifications
    
    Replace `computeIfAbsent` with an explicit two-step pattern:
    
    1. **Fast path**: call `cache.get(topic)` first — a single volatile read 
with no locking — and return immediately on a cache hit (steady-state case).
    2. **Slow path** (cache miss): construct `TopicName` *outside* the lock, 
then use `cache.put()` to insert.
---
 .../broker/naming/TopicNameGetBenchmark.java       | 88 ++++++++++++++++++++++
 .../apache/pulsar/broker/naming/package-info.java  | 23 ++++++
 .../org/apache/pulsar/common/naming/TopicName.java |  7 +-
 3 files changed, 117 insertions(+), 1 deletion(-)

diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java
new file mode 100644
index 00000000000..74188e16d3a
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.naming;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.naming.TopicName;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) 
performance
+ * under 50-thread contention.
+ *
+ * <p>Uses {@code Mode.SingleShotTime} with {@code @Fork(10)} to measure
+ * the total time of a fixed batch of cold-start calls. No cache clearing is
+ * needed — each fork is a fresh JVM with an empty cache, and the batch size
+ * is bounded to avoid OOM.
+ *
+ * <p>Run with:
+ * <pre>
+ *   ./gradlew :microbench:shadowJar
+ *   java -jar microbench/build/libs/microbench-*-benchmarks.jar 
TopicNameGetBenchmark
+ * </pre>
+ */
+@Fork(10)
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Warmup(iterations = 2)
+@Measurement(iterations = 5)
+@Threads(50)
+@State(Scope.Thread)
+public class TopicNameGetBenchmark {
+
+    /**
+     * Each thread processes 10,000 unique topics per invocation.
+     * 50 threads × 10,000 = 500,000 total entries per invocation — well 
within memory.
+     */
+    private static final int BATCH_SIZE = 10_000;
+    private static final AtomicInteger COUNTER = new AtomicInteger();
+
+    private String[] topics;
+
+    @Setup(Level.Invocation)
+    public void prepare() {
+        int base = COUNTER.getAndAdd(BATCH_SIZE);
+        topics = new String[BATCH_SIZE];
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            topics[i] = "persistent://public/default/topic-" + (base + i);
+        }
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(BATCH_SIZE)
+    public void coldStartGet(Blackhole bh) {
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            bh.consume(TopicName.get(topics[i]));
+        }
+    }
+}
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java 
b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java
new file mode 100644
index 00000000000..f655bf0490d
--- /dev/null
+++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Benchmarks for {@link 
org.apache.pulsar.common.naming.TopicName#get(String)} cold-start (cache-miss) 
performance
+ */
+package org.apache.pulsar.broker.naming;
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 55042f23634..83d694add67 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -78,11 +78,16 @@ public class TopicName implements ServiceUnitId {
     }
 
     public static TopicName get(String topic) {
+        // Fast path: already cached — single volatile read, no lock.
         TopicName tp = cache.get(topic);
         if (tp != null) {
             return tp;
         }
-        return cache.computeIfAbsent(topic, k -> new TopicName(k));
+        // Use get()+put() instead of computeIfAbsent() to keep construction 
outside the bin-lock.
+        // Duplicate instances from racing threads are safe to discard since 
TopicName is immutable.
+        TopicName newTp = new TopicName(topic);
+        TopicName existing = cache.put(topic, newTp);
+        return existing != null ? existing : newTp;
     }
 
     public static TopicName getPartitionedTopicName(String topic) {

Reply via email to