This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 83064620fc6a7a62db9b669c950de53fc25d1f68 Author: Xiangying Meng <[email protected]> AuthorDate: Tue Apr 21 16:56:01 2026 +0800 [improve][common] Optimize TopicName.get() to reduce lock contention on cache lookup (#25367) ### Motivation `TopicName.get()` previously used `ConcurrentHashMap.computeIfAbsent()` to populate the topic-name cache. Although `computeIfAbsent` is atomic, it holds the internal bin-lock for the entire duration of the mapping function, which includes the non-trivial `TopicName` construction (string splitting, validation, etc.). Under high-concurrency workloads where many threads simultaneously encounter the same uncached topic name, this causes unnecessary lock contention and can degrade throughput. ### Modifications Replace `computeIfAbsent` with an explicit two-step pattern: 1. **Fast path**: call `cache.get(topic)` first — a single volatile read with no locking — and return immediately on a cache hit (steady-state case). 2. **Slow path** (cache miss): construct `TopicName` *outside* the lock, then use `cache.put()` to insert. (cherry picked from commit 8c4e83d2374a6431953684bfadbf5abc301dd26f) --- .../broker/naming/TopicNameGetBenchmark.java | 88 ++++++++++++++++++++++ .../apache/pulsar/broker/naming/package-info.java | 23 ++++++ .../org/apache/pulsar/common/naming/TopicName.java | 7 +- 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java new file mode 100644 index 00000000000..74188e16d3a --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.naming; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pulsar.common.naming.TopicName; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) performance + * under 50-thread contention. + * + * <p>Uses {@code Mode.SingleShotTime} with {@code @Fork(10)} to measure + * the total time of a fixed batch of cold-start calls. No cache clearing is + * needed — each fork is a fresh JVM with an empty cache, and the batch size + * is bounded to avoid OOM. + * + * <p>Run with: + * <pre> + * ./gradlew :microbench:shadowJar + * java -jar microbench/build/libs/microbench-*-benchmarks.jar TopicNameGetBenchmark + * </pre> + */ +@Fork(10) +@BenchmarkMode(Mode.SingleShotTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Warmup(iterations = 2) +@Measurement(iterations = 5) +@Threads(50) +@State(Scope.Thread) +public class TopicNameGetBenchmark { + + /** + * Each thread processes 10,000 unique topics per invocation. + * 50 threads × 10,000 = 500,000 total entries per invocation — well within memory. + */ + private static final int BATCH_SIZE = 10_000; + private static final AtomicInteger COUNTER = new AtomicInteger(); + + private String[] topics; + + @Setup(Level.Invocation) + public void prepare() { + int base = COUNTER.getAndAdd(BATCH_SIZE); + topics = new String[BATCH_SIZE]; + for (int i = 0; i < BATCH_SIZE; i++) { + topics[i] = "persistent://public/default/topic-" + (base + i); + } + } + + @Benchmark + @OperationsPerInvocation(BATCH_SIZE) + public void coldStartGet(Blackhole bh) { + for (int i = 0; i < BATCH_SIZE; i++) { + bh.consume(TopicName.get(topics[i])); + } + } +} diff --git a/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java new file mode 100644 index 00000000000..f655bf0490d --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Benchmarks for {@link org.apache.pulsar.common.naming.TopicName#get(String)} cold-start (cache-miss) performance + */ +package org.apache.pulsar.broker.naming; \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 4d9b28df91b..f9177420a58 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -79,11 +79,16 @@ public class TopicName implements ServiceUnitId { } public static TopicName get(String topic) { + // Fast path: already cached — single volatile read, no lock. TopicName tp = cache.get(topic); if (tp != null) { return tp; } - return cache.computeIfAbsent(topic, k -> new TopicName(k)); + // Use get()+put() instead of computeIfAbsent() to keep construction outside the bin-lock. + // Duplicate instances from racing threads are safe to discard since TopicName is immutable. + TopicName newTp = new TopicName(topic); + TopicName existing = cache.put(topic, newTp); + return existing != null ? existing : newTp; } public static TopicName getPartitionedTopicName(String topic) {
