This is an automated email from the ASF dual-hosted git repository.
liangyepianzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c4e83d2374 [improve][common] Optimize TopicName.get() to reduce lock
contention on cache lookup (#25367)
8c4e83d2374 is described below
commit 8c4e83d2374a6431953684bfadbf5abc301dd26f
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Apr 21 16:56:01 2026 +0800
[improve][common] Optimize TopicName.get() to reduce lock contention on
cache lookup (#25367)
### Motivation
`TopicName.get()` previously used `ConcurrentHashMap.computeIfAbsent()` to
populate the topic-name cache. Although `computeIfAbsent` is atomic, it holds
the internal bin-lock for the entire duration of the mapping function, which
includes the non-trivial `TopicName` construction (string splitting,
validation, etc.).
Under high-concurrency workloads where many threads simultaneously
encounter the same uncached topic name, this causes unnecessary lock contention
and can degrade throughput.
### Modifications
Replace `computeIfAbsent` with an explicit two-step pattern:
1. **Fast path**: call `cache.get(topic)` first — a single volatile read
with no locking — and return immediately on a cache hit (steady-state case).
2. **Slow path** (cache miss): construct `TopicName` *outside* the lock,
then use `cache.put()` to insert.
---
.../broker/naming/TopicNameGetBenchmark.java | 88 ++++++++++++++++++++++
.../apache/pulsar/broker/naming/package-info.java | 23 ++++++
.../org/apache/pulsar/common/naming/TopicName.java | 7 +-
3 files changed, 117 insertions(+), 1 deletion(-)
diff --git
a/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java
b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java
new file mode 100644
index 00000000000..74188e16d3a
--- /dev/null
+++
b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.naming;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.naming.TopicName;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss)
performance
+ * under 50-thread contention.
+ *
+ * <p>Uses {@code Mode.SingleShotTime} with {@code @Fork(10)} to measure
+ * the total time of a fixed batch of cold-start calls. No cache clearing is
+ * needed — each fork is a fresh JVM with an empty cache, and the batch size
+ * is bounded to avoid OOM.
+ *
+ * <p>Run with:
+ * <pre>
+ * ./gradlew :microbench:shadowJar
+ * java -jar microbench/build/libs/microbench-*-benchmarks.jar
TopicNameGetBenchmark
+ * </pre>
+ */
+@Fork(10)
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Warmup(iterations = 2)
+@Measurement(iterations = 5)
+@Threads(50)
+@State(Scope.Thread)
+public class TopicNameGetBenchmark {
+
+ /**
+ * Each thread processes 10,000 unique topics per invocation.
+ * 50 threads × 10,000 = 500,000 total entries per invocation — well
within memory.
+ */
+ private static final int BATCH_SIZE = 10_000;
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ private String[] topics;
+
+ @Setup(Level.Invocation)
+ public void prepare() {
+ int base = COUNTER.getAndAdd(BATCH_SIZE);
+ topics = new String[BATCH_SIZE];
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ topics[i] = "persistent://public/default/topic-" + (base + i);
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(BATCH_SIZE)
+ public void coldStartGet(Blackhole bh) {
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ bh.consume(TopicName.get(topics[i]));
+ }
+ }
+}
diff --git
a/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java
b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java
new file mode 100644
index 00000000000..f655bf0490d
--- /dev/null
+++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Benchmarks for {@link
org.apache.pulsar.common.naming.TopicName#get(String)} cold-start (cache-miss)
performance
+ */
+package org.apache.pulsar.broker.naming;
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 55042f23634..83d694add67 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -78,11 +78,16 @@ public class TopicName implements ServiceUnitId {
}
public static TopicName get(String topic) {
+ // Fast path: already cached — single volatile read, no lock.
TopicName tp = cache.get(topic);
if (tp != null) {
return tp;
}
- return cache.computeIfAbsent(topic, k -> new TopicName(k));
+ // Use get()+put() instead of computeIfAbsent() to keep construction
outside the bin-lock.
+ // Duplicate instances from racing threads are safe to discard since
TopicName is immutable.
+ TopicName newTp = new TopicName(topic);
+ TopicName existing = cache.put(topic, newTp);
+ return existing != null ? existing : newTp;
}
public static TopicName getPartitionedTopicName(String topic) {