Copilot commented on code in PR #25384:
URL: https://github.com/apache/pulsar/pull/25384#discussion_r3031236582


##########
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+
+public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private final AtomicLong idGenerator = new AtomicLong(1);
+    private final Map<Long, SnapshotMetadata> snapshots = new 
ConcurrentHashMap<>();
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata 
snapshotMetadata,
+                                                        List<SnapshotSegment> 
bucketSnapshotSegments,
+                                                        String bucketKey, 
String topicName, String cursorName) {
+        long id = idGenerator.getAndIncrement();
+        snapshots.put(id, snapshotMetadata);
+        return CompletableFuture.completedFuture(id);
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long 
bucketId) {
+        SnapshotMetadata metadata = snapshots.get(bucketId);
+        return CompletableFuture.completedFuture(metadata);
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> 
getBucketSnapshotSegment(long bucketId,
+                                                                             
long firstSegmentEntryId,
+                                                                             
long lastSegmentEntryId) {
+        return CompletableFuture.completedFuture(Collections.emptyList());
+    }

Review Comment:
   `getBucketSnapshotSegment(...)` always returns an empty list. For buckets 
that have snapshot metadata indicating segments exist, returning empty prevents 
`ImmutableBucket` from advancing `currentSegmentEntryId`, and repeated loads 
may never make progress. The mock should return the previously stored segment 
data for the requested range (or fail the future if the segment is missing).



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+
+public class NoopDelayedDeliveryContext implements DelayedDeliveryContext {
+
+    private final String name;
+    private final ManagedCursor cursor;
+    private final AtomicInteger triggerCount = new AtomicInteger();
+    private final Object triggerLock = new Object();
+
+    public NoopDelayedDeliveryContext(String name, ManagedCursor cursor) {
+        this.name = name;
+        this.cursor = cursor;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public ManagedCursor getCursor() {
+        return cursor;
+    }
+
+    @Override
+    public Object getTriggerLock() {
+        return triggerLock;
+    }
+
+    @Override
+    public void triggerReadMoreEntries() {
+        // no-op; for tests/JMH
+        triggerCount.incrementAndGet();
+    }
+
+    public int getTriggerCount() {

Review Comment:
   `getTriggerCount()` (and the backing counter) appears unused outside this 
class and increases the exported surface of a broker-module type intended for 
tests/JMH. Consider removing it, or at least restricting it (e.g., 
package-private / @VisibleForTesting) to avoid carrying a public API that 
production code shouldn't rely on.
   ```suggestion
       int getTriggerCount() {
   ```



##########
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+
+public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private final AtomicLong idGenerator = new AtomicLong(1);
+    private final Map<Long, SnapshotMetadata> snapshots = new 
ConcurrentHashMap<>();
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata 
snapshotMetadata,
+                                                        List<SnapshotSegment> 
bucketSnapshotSegments,
+                                                        String bucketKey, 
String topicName, String cursorName) {
+        long id = idGenerator.getAndIncrement();
+        snapshots.put(id, snapshotMetadata);
+        return CompletableFuture.completedFuture(id);
+    }

Review Comment:
   `createBucketSnapshot(...)` ignores the provided `bucketSnapshotSegments`. 
Since `ImmutableBucket` later loads snapshot segments from 
`BucketSnapshotStorage`, the benchmark will not exercise realistic code paths 
(and can get stuck repeatedly loading an empty segment). Consider persisting 
the segments in-memory keyed by (bucketId, segmentEntryId) and returning them 
in `getBucketSnapshotSegment(...)`.



##########
microbench/src/main/java/org/apache/pulsar/broker/package-info.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Microbenchmarks for delayed message delivery bucket implementation.
+ *
+ * <p>This package contains JMH benchmarks for testing the performance
+ * characteristics of the BucketDelayedDeliveryTracker, particularly
+ * focusing on thread safety improvements with ReentrantReadWriteLock.

Review Comment:
   This package-info documents `org.apache.pulsar.broker`, but the 
delayed-delivery JMH benchmarks live under 
`org.apache.pulsar.broker.delayed.bucket` (which already has its own 
package-info). This is likely misplaced and will attach the Javadoc to the 
wrong package; consider removing this file or moving the documentation under 
the correct package.
   ```suggestion
    * Microbenchmarks for broker-related components.
   ```



##########
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Clock;
+import java.util.NavigableSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ActiveManagedCursorContainerImpl;
+import org.apache.bookkeeper.mledger.impl.MockManagedCursor;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.NoopDelayedDeliveryContext;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Enhanced JMH Benchmarks for BucketDelayedDeliveryTracker with 
ReentrantReadWriteLock.
+ * This benchmark tests the performance improvements made by transitioning from
+ * StampedLock to ReentrantReadWriteLock for fine-grained concurrency control.

Review Comment:
   The Javadoc claims this benchmark targets a ReentrantReadWriteLock-based 
implementation and a transition from StampedLock, but the current 
`BucketDelayedDeliveryTracker` still uses `StampedLock`. Please update the 
description to match what is actually being benchmarked so the results are not 
misinterpreted.
   ```suggestion
    * JMH benchmarks for {@link BucketDelayedDeliveryTracker}.
    * This benchmark measures tracker throughput under different read/write 
ratios
    * and initial message counts so the current implementation can be evaluated
    * without implying a specific lock migration.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to