Copilot commented on code in PR #25384: URL: https://github.com/apache/pulsar/pull/25384#discussion_r3031236582
########## microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; + +public class MockBucketSnapshotStorage implements BucketSnapshotStorage { + + private final AtomicLong idGenerator = new AtomicLong(1); + private final Map<Long, SnapshotMetadata> snapshots = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata, + List<SnapshotSegment> bucketSnapshotSegments, + String bucketKey, String topicName, String cursorName) { + long id = idGenerator.getAndIncrement(); + snapshots.put(id, snapshotMetadata); + return CompletableFuture.completedFuture(id); + } + + @Override + public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) { + SnapshotMetadata metadata = snapshots.get(bucketId); + return CompletableFuture.completedFuture(metadata); + } + + @Override + public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, + long firstSegmentEntryId, + long lastSegmentEntryId) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } Review Comment: `getBucketSnapshotSegment(...)` always returns an empty list. For buckets that have snapshot metadata indicating segments exist, returning empty prevents `ImmutableBucket` from advancing `currentSegmentEntryId`, and repeated loads may never make progress. The mock should return the previously stored segment data for the requested range (or fail the future if the segment is missing). ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.mledger.ManagedCursor; + +public class NoopDelayedDeliveryContext implements DelayedDeliveryContext { + + private final String name; + private final ManagedCursor cursor; + private final AtomicInteger triggerCount = new AtomicInteger(); + private final Object triggerLock = new Object(); + + public NoopDelayedDeliveryContext(String name, ManagedCursor cursor) { + this.name = name; + this.cursor = cursor; + } + + @Override + public String getName() { + return name; + } + + @Override + public ManagedCursor getCursor() { + return cursor; + } + + @Override + public Object getTriggerLock() { + return triggerLock; + } + + @Override + public void triggerReadMoreEntries() { + // no-op; for tests/JMH + triggerCount.incrementAndGet(); + } + + public int getTriggerCount() { Review Comment: `getTriggerCount()` (and the backing counter) appears unused outside this class and increases the exported surface of a broker-module type intended for tests/JMH. Consider removing it, or at least restricting it (e.g., package-private / @VisibleForTesting) to avoid carrying a public API that production code shouldn't rely on. ```suggestion int getTriggerCount() { ``` ########## microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; + +public class MockBucketSnapshotStorage implements BucketSnapshotStorage { + + private final AtomicLong idGenerator = new AtomicLong(1); + private final Map<Long, SnapshotMetadata> snapshots = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata, + List<SnapshotSegment> bucketSnapshotSegments, + String bucketKey, String topicName, String cursorName) { + long id = idGenerator.getAndIncrement(); + snapshots.put(id, snapshotMetadata); + return CompletableFuture.completedFuture(id); + } Review Comment: `createBucketSnapshot(...)` ignores the provided `bucketSnapshotSegments`. Since `ImmutableBucket` later loads snapshot segments from `BucketSnapshotStorage`, the benchmark will not exercise realistic code paths (and can get stuck repeatedly loading an empty segment). Consider persisting the segments in-memory keyed by (bucketId, segmentEntryId) and returning them in `getBucketSnapshotSegment(...)`. ########## microbench/src/main/java/org/apache/pulsar/broker/package-info.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Microbenchmarks for delayed message delivery bucket implementation. + * + * <p>This package contains JMH benchmarks for testing the performance + * characteristics of the BucketDelayedDeliveryTracker, particularly + * focusing on thread safety improvements with ReentrantReadWriteLock. Review Comment: This package-info documents `org.apache.pulsar.broker`, but the delayed-delivery JMH benchmarks live under `org.apache.pulsar.broker.delayed.bucket` (which already has its own package-info). This is likely misplaced and will attach the Javadoc to the wrong package; consider removing this file or moving the documentation under the correct package. ```suggestion * Microbenchmarks for broker-related components. ``` ########## microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.time.Clock; +import java.util.NavigableSet; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ActiveManagedCursorContainerImpl; +import org.apache.bookkeeper.mledger.impl.MockManagedCursor; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; +import org.apache.pulsar.broker.delayed.NoopDelayedDeliveryContext; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Enhanced JMH Benchmarks for BucketDelayedDeliveryTracker with ReentrantReadWriteLock. + * This benchmark tests the performance improvements made by transitioning from + * StampedLock to ReentrantReadWriteLock for fine-grained concurrency control. Review Comment: The Javadoc claims this benchmark targets a ReentrantReadWriteLock-based implementation and a transition from StampedLock, but the current `BucketDelayedDeliveryTracker` still uses `StampedLock`. Please update the description to match what is actually being benchmarked so the results are not misinterpreted. ```suggestion * JMH benchmarks for {@link BucketDelayedDeliveryTracker}. * This benchmark measures tracker throughput under different read/write ratios * and initial message counts so the current implementation can be evaluated * without implying a specific lock migration. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
