This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a10fe9befc [refactor][broker] Decouple delayed delivery trackers from 
dispatcher (#25384)
4a10fe9befc is described below

commit 4a10fe9befc28ab05a383a3258dc859559a732c1
Author: sinan liu <[email protected]>
AuthorDate: Tue Apr 7 11:33:22 2026 +0800

    [refactor][broker] Decouple delayed delivery trackers from dispatcher 
(#25384)
---
 .../bookkeeper/mledger/impl/MockManagedCursor.java |   5 +-
 .../BucketDelayedDeliveryTrackerBenchmark.java     | 321 ++++++++++++++++
 ...ucketDelayedDeliveryTrackerSimpleBenchmark.java | 407 ---------------------
 .../delayed/bucket/MockBucketSnapshotStorage.java  | 106 ++++++
 .../delayed/AbstractDelayedDeliveryTracker.java    |  31 +-
 .../broker/delayed/DelayedDeliveryContext.java     |  32 ++
 .../delayed/DispatcherDelayedDeliveryContext.java  |  51 +++
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  21 +-
 .../broker/delayed/NoopDelayedDeliveryContext.java |  53 +++
 .../bucket/BucketDelayedDeliveryTracker.java       |  63 ++--
 10 files changed, 643 insertions(+), 447 deletions(-)

diff --git 
a/microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java
 
b/microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java
index af602a8a9fe..927b4a1c929 100644
--- 
a/microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java
+++ 
b/microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java
@@ -36,7 +36,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 
-class MockManagedCursor implements ManagedCursor {
+public class MockManagedCursor implements ManagedCursor {
     ActiveManagedCursorContainer container;
     Position markDeletePosition;
     Position readPosition;
@@ -58,7 +58,8 @@ class MockManagedCursor implements ManagedCursor {
         this.durable = durable;
     }
 
-    static MockManagedCursor createCursor(ActiveManagedCursorContainer 
container, String name, Position position) {
+    public static MockManagedCursor createCursor(ActiveManagedCursorContainer 
container, String name,
+                                                Position position) {
         return new MockManagedCursor(container, name, position, position, 
false, true);
     }
 
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
new file mode 100644
index 00000000000..08d02195c87
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Clock;
+import java.util.NavigableSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ActiveManagedCursorContainerImpl;
+import org.apache.bookkeeper.mledger.impl.MockManagedCursor;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.NoopDelayedDeliveryContext;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * JMH benchmarks for {@link BucketDelayedDeliveryTracker}.
+ *
+ * <p>This benchmark measures tracker throughput under different read/write 
ratios
+ * and initial message counts without implying a specific lock implementation.
+ *
+ * <p>Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
+ *           -Dexec.args="BucketDelayedDeliveryTrackerBenchmark"
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@State(Scope.Benchmark)
+@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+@Fork(1)
+public class BucketDelayedDeliveryTrackerBenchmark {
+
+    /**
+     * Fixed delivery timestamp base that stays far beyond any benchmark trial 
duration,
+     * so scheduled tasks will not start firing while throughput is being 
measured.
+     */
+    private static final long FUTURE_DELIVERY_BASE_TIME_MILLIS = 
4102444800000L; // 2100-01-01T00:00:00Z
+
+    @Param({"90_10", "80_20", "70_30", "50_50"})
+    public String readWriteRatio;
+
+    @Param({"1000", "5000", "8000"})
+    public int initialMessages;
+
+    private BucketDelayedDeliveryTracker tracker;
+    private Timer timer;
+    private MockBucketSnapshotStorage storage;
+    private NoopDelayedDeliveryContext context;
+    private AtomicLong messageIdGenerator;
+    private int readPercentage;
+    private long futureDeliveryBaseTimeMillis;
+    /**
+     * Maximum number of additional unique (ledgerId, entryId) positions to
+     * introduce per trial on top of {@link #initialMessages}. This allows
+     * controlling the memory footprint of the benchmark while still applying
+     * sustained write pressure to the tracker.
+     *
+     * <p>Use {@code -p maxAdditionalUniqueMessages=...} on the JMH command 
line
+     * to tune the load. The default value is conservative for local runs.</p>
+     */
+    @Param({"1000000"})
+    public long maxAdditionalUniqueMessages;
+    /**
+     * Upper bound on the absolute message id that will be used to derive
+     * (ledgerId, entryId) positions during a single trial.
+     */
+    private long maxUniqueMessageId;
+    /**
+     * In real Pulsar usage, {@link DelayedDeliveryTracker#addMessage(long, 
long, long)} is invoked
+     * by a single dispatcher thread and messages arrive in order of 
(ledgerId, entryId).
+     * <p>
+     * To reflect this invariant in the benchmark, all write operations that 
end up calling
+     * {@code tracker.addMessage(...)} are serialized via this mutex so that 
the tracker only
+     * ever observes a single writer with monotonically increasing ids, even 
when JMH runs the
+     * benchmark method with multiple threads.
+     */
+    private final Object writeMutex = new Object();
+
+    @Setup(Level.Trial)
+    public void setup() throws Exception {
+        setupMockComponents();
+        createTracker();
+        String[] parts = readWriteRatio.split("_");
+        readPercentage = Integer.parseInt(parts[0]);
+        futureDeliveryBaseTimeMillis = FUTURE_DELIVERY_BASE_TIME_MILLIS;
+        preloadMessages();
+        messageIdGenerator = new AtomicLong(initialMessages + 1);
+        // Allow a bounded number of additional unique messages per trial to 
avoid
+        // unbounded memory growth while still stressing the indexing logic.
+        maxUniqueMessageId = initialMessages + maxAdditionalUniqueMessages;
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() throws Exception {
+        if (tracker != null) {
+            tracker.close();
+        }
+        if (timer != null) {
+            timer.stop();
+        }
+    }
+
+    private void setupMockComponents() throws Exception {
+        timer = new HashedWheelTimer(new 
DefaultThreadFactory("test-delayed-delivery"), 100, TimeUnit.MILLISECONDS);
+        storage = new MockBucketSnapshotStorage();
+
+        ActiveManagedCursorContainerImpl container = new 
ActiveManagedCursorContainerImpl();
+        MockManagedCursor cursor = MockManagedCursor.createCursor(container, 
"test-cursor",
+                PositionFactory.create(0, 0));
+        // Use the same "<topic> / <cursor>" naming pattern as real 
dispatchers,
+        // so that Bucket.asyncSaveBucketSnapshot can correctly derive 
topicName.
+        String dispatcherName = "persistent://public/default/jmh-topic / " + 
cursor.getName();
+        context = new NoopDelayedDeliveryContext(dispatcherName, cursor);
+    }
+
+    private void createTracker() throws Exception {
+        tracker = new BucketDelayedDeliveryTracker(
+                context, timer, 1000, Clock.systemUTC(), true, storage,
+                20, 1000, 100, 50
+        );
+    }
+
+    private void preloadMessages() {
+        // Preload messages to create realistic test conditions while keeping
+        // delivery timestamps far beyond the benchmark trial duration so the
+        // tracker's timer does not start firing during measurement.
+        long baseTime = futureDeliveryBaseTimeMillis;
+        for (int i = 1; i <= initialMessages; i++) {
+            tracker.addMessage(i, i, baseTime + i * 1000L);
+        }
+    }
+
+    // 
=============================================================================
+    // READ-WRITE RATIO BENCHMARKS
+    // 
=============================================================================
+
+    @Benchmark
+    public boolean benchmarkMixedOperations() {
+        if (ThreadLocalRandom.current().nextInt(100) < readPercentage) {
+            // Read operations
+            return performReadOperation();
+        } else {
+            // Write operations
+            return performWriteOperation();
+        }
+    }
+
+    /**
+     * Serialize calls to {@link BucketDelayedDeliveryTracker#addMessage(long, 
long, long)} and
+     * ensure (ledgerId, entryId) are generated in a strictly increasing 
sequence, matching the
+     * real dispatcher single-threaded behaviour.
+     */
+    private boolean addMessageSequential(long deliverAt, int entryIdModulo) {
+        synchronized (writeMutex) {
+            long id = messageIdGenerator.getAndIncrement();
+            // Limit the number of distinct positions that are introduced into 
the tracker
+            // to keep memory usage bounded. Once the upper bound is reached, 
we re-use
+            // the last position id so that subsequent calls behave like 
updates to
+            // existing messages and are short-circuited by containsMessage 
checks.
+            long boundedId = Math.min(id, maxUniqueMessageId);
+            long ledgerId = boundedId;
+            long entryId = boundedId % entryIdModulo;
+            return tracker.addMessage(ledgerId, entryId, deliverAt);
+        }
+    }
+
+    private boolean performReadOperation() {
+        int operation = ThreadLocalRandom.current().nextInt(3);
+        switch (operation) {
+            case 0:
+                // containsMessage
+                long ledgerId = ThreadLocalRandom.current().nextLong(1, 
initialMessages + 100);
+                long entryId = ThreadLocalRandom.current().nextLong(1, 1000);
+                return tracker.containsMessage(ledgerId, entryId);
+            case 1:
+                // nextDeliveryTime
+                try {
+                    tracker.nextDeliveryTime();
+                    return true;
+                } catch (Exception e) {
+                    return false;
+                }
+            case 2:
+                // getNumberOfDelayedMessages
+                long count = tracker.getNumberOfDelayedMessages();
+                return count >= 0;
+            default:
+                return false;
+        }
+    }
+
+    private boolean performWriteOperation() {
+        long deliverAt = futureDeliveryBaseTimeMillis + 
ThreadLocalRandom.current().nextLong(5000, 30000);
+        return addMessageSequential(deliverAt, 1000);
+    }
+
+    // 
=============================================================================
+    // SPECIFIC OPERATION BENCHMARKS
+    // 
=============================================================================
+
+    @Benchmark
+    @Threads(8)
+    public boolean benchmarkConcurrentContainsMessage() {
+        long ledgerId = ThreadLocalRandom.current().nextLong(1, 
initialMessages + 100);
+        long entryId = ThreadLocalRandom.current().nextLong(1, 1000);
+        return tracker.containsMessage(ledgerId, entryId);
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkConcurrentAddMessage() {
+        long deliverAt = futureDeliveryBaseTimeMillis + 
ThreadLocalRandom.current().nextLong(10000, 60000);
+        return addMessageSequential(deliverAt, 1000);
+    }
+
+    @Benchmark
+    @Threads(2)
+    public NavigableSet<Position> benchmarkConcurrentGetScheduledMessages() {
+        // Create some messages ready for delivery
+        long currentTime = System.currentTimeMillis();
+        for (int i = 0; i < 5; i++) {
+            addMessageSequential(currentTime - 1000, 100);
+        }
+        return tracker.getScheduledMessages(10);
+    }
+
+    @Benchmark
+    @Threads(16)
+    public long benchmarkConcurrentNextDeliveryTime() {
+        try {
+            return tracker.nextDeliveryTime();
+        } catch (Exception e) {
+            return -1;
+        }
+    }
+
+    @Benchmark
+    @Threads(1)
+    public long benchmarkGetNumberOfDelayedMessages() {
+        return tracker.getNumberOfDelayedMessages();
+    }
+
+    // 
=============================================================================
+    // HIGH CONTENTION SCENARIOS
+    // 
=============================================================================
+
+    @Benchmark
+    @Threads(32)
+    public boolean benchmarkHighContentionMixedOperations() {
+        return benchmarkMixedOperations();
+    }
+
+    @Benchmark
+    @Threads(16)
+    public boolean benchmarkContentionReads() {
+        return performReadOperation();
+    }
+
+    @Benchmark
+    @Threads(8)
+    public boolean benchmarkContentionWrites() {
+        return performWriteOperation();
+    }
+
+    // 
=============================================================================
+    // THROUGHPUT BENCHMARKS
+    // 
=============================================================================
+
+    @Benchmark
+    @Threads(1)
+    public boolean benchmarkSingleThreadedThroughput() {
+        return benchmarkMixedOperations();
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkMediumConcurrencyThroughput() {
+        return benchmarkMixedOperations();
+    }
+
+    @Benchmark
+    @Threads(8)
+    public boolean benchmarkHighConcurrencyThroughput() {
+        return benchmarkMixedOperations();
+    }
+
+}
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
deleted file mode 100644
index 8e8b0878793..00000000000
--- 
a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.delayed.bucket;
-
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.StampedLock;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Warmup;
-
-/**
- * Simplified JMH Benchmarks for BucketDelayedDeliveryTracker thread safety 
improvements.
- * This benchmark focuses on the core StampedLock optimistic read performance 
without
- * complex dependencies on the full BucketDelayedDeliveryTracker 
implementation.
- * Run with: java -jar microbench/build/libs/microbenchmarks.jar 
BucketDelayedDeliveryTrackerSimpleBenchmark
- */
-@BenchmarkMode(Mode.Throughput)
-@OutputTimeUnit(TimeUnit.SECONDS)
-@State(Scope.Benchmark)
-@Warmup(iterations = 5, time = 1)
-@Measurement(iterations = 5, time = 1)
-@Fork(1)
-public class BucketDelayedDeliveryTrackerSimpleBenchmark {
-
-    @Param({"1", "2", "4", "8", "16"})
-    public int threadCount;
-
-    private StampedLock stampedLock;
-    private boolean testData = true;
-    private volatile long counter = 0;
-
-    @Setup(Level.Trial)
-    public void setup() throws Exception {
-        stampedLock = new StampedLock();
-    }
-
-    @TearDown(Level.Trial)
-    public void tearDown() throws Exception {
-        // Cleanup if needed
-    }
-
-    // 
=============================================================================
-    // STAMPED LOCK OPTIMISTIC READ BENCHMARKS
-    // 
=============================================================================
-
-    @Benchmark
-    @Threads(1)
-    public boolean benchmarkOptimisticReadSingleThreaded() {
-        // Simulate optimistic read like in containsMessage()
-        long stamp = stampedLock.tryOptimisticRead();
-        boolean result = testData; // Simulate reading shared data
-
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = testData;
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-
-    @Benchmark
-    @Threads(2)
-    public boolean benchmarkOptimisticReadMultiThreaded() {
-        long stamp = stampedLock.tryOptimisticRead();
-        boolean result = testData;
-
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = testData;
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-
-    @Benchmark
-    @Threads(8)
-    public boolean benchmarkOptimisticReadHighConcurrency() {
-        long stamp = stampedLock.tryOptimisticRead();
-        boolean result = testData;
-
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = testData;
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-
-    @Benchmark
-    @Threads(16)
-    public boolean benchmarkOptimisticReadExtremeConcurrency() {
-        long stamp = stampedLock.tryOptimisticRead();
-        boolean result = testData;
-
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = testData;
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-
-    // 
=============================================================================
-    // READ:WRITE RATIO BENCHMARKS (as requested)
-    // 
=============================================================================
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite10_90() {
-        // 10:90 read:write ratio simulation
-        if (ThreadLocalRandom.current().nextInt(100) < 10) {
-            // Read operation
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            // Write operation
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite20_80() {
-        // 20:80 read:write ratio
-        if (ThreadLocalRandom.current().nextInt(100) < 20) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite40_60() {
-        // 40:60 read:write ratio
-        if (ThreadLocalRandom.current().nextInt(100) < 40) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite50_50() {
-        // 50:50 read:write ratio
-        if (ThreadLocalRandom.current().nextInt(100) < 50) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite60_40() {
-        // 60:40 read:write ratio
-        if (ThreadLocalRandom.current().nextInt(100) < 60) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite80_20() {
-        // 80:20 read:write ratio
-        if (ThreadLocalRandom.current().nextInt(100) < 80) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(4)
-    public boolean benchmarkReadWrite90_10() {
-        // 90:10 read:write ratio - most realistic for production
-        if (ThreadLocalRandom.current().nextInt(100) < 90) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    // 
=============================================================================
-    // HIGH CONCURRENCY SCENARIOS
-    // 
=============================================================================
-
-    @Benchmark
-    @Threads(8)
-    public boolean benchmarkReadWrite90_10_HighConcurrency() {
-        // 90:10 read:write ratio with high concurrency
-        if (ThreadLocalRandom.current().nextInt(100) < 90) {
-            long stamp = stampedLock.tryOptimisticRead();
-            boolean result = testData;
-
-            if (!stampedLock.validate(stamp)) {
-                stamp = stampedLock.readLock();
-                try {
-                    result = testData;
-                } finally {
-                    stampedLock.unlockRead(stamp);
-                }
-            }
-            return result;
-        } else {
-            long stamp = stampedLock.writeLock();
-            try {
-                testData = !testData;
-                counter++;
-                return testData;
-            } finally {
-                stampedLock.unlockWrite(stamp);
-            }
-        }
-    }
-
-    @Benchmark
-    @Threads(16)
-    public boolean benchmarkOptimisticReadContention() {
-        // High contention scenario to test optimistic read fallback behavior
-        long stamp = stampedLock.tryOptimisticRead();
-        boolean result = testData;
-
-        // Simulate some computation
-        if (ThreadLocalRandom.current().nextInt(1000) == 0) {
-            Thread.yield(); // Occasionally yield to increase contention
-        }
-
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = testData;
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-}
\ No newline at end of file
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java
new file mode 100644
index 00000000000..c89071d31a0
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+
+public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private final AtomicLong idGenerator = new AtomicLong(1);
+    private final Map<Long, SnapshotMetadata> snapshots = new 
ConcurrentHashMap<>();
+    private final Map<Long, List<SnapshotSegment>> snapshotSegments = new 
ConcurrentHashMap<>();
+    private final Map<Long, Long> snapshotLengths = new ConcurrentHashMap<>();
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata 
snapshotMetadata,
+                                                        List<SnapshotSegment> 
bucketSnapshotSegments,
+                                                        String bucketKey, 
String topicName, String cursorName) {
+        long id = idGenerator.getAndIncrement();
+        snapshots.put(id, snapshotMetadata);
+        snapshotSegments.put(id, new ArrayList<>(bucketSnapshotSegments));
+        long snapshotLength = snapshotMetadata.toByteArray().length;
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            snapshotLength += bucketSnapshotSegment.toByteArray().length;
+        }
+        snapshotLengths.put(id, snapshotLength);
+        return CompletableFuture.completedFuture(id);
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long 
bucketId) {
+        SnapshotMetadata metadata = snapshots.get(bucketId);
+        return CompletableFuture.completedFuture(metadata);
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> 
getBucketSnapshotSegment(long bucketId,
+                                                                             
long firstSegmentEntryId,
+                                                                             
long lastSegmentEntryId) {
+        List<SnapshotSegment> segments = snapshotSegments.get(bucketId);
+        if (segments == null) {
+            return CompletableFuture.failedFuture(
+                    new IllegalArgumentException("Bucket snapshot segments not 
found: " + bucketId));
+        }
+        if (firstSegmentEntryId > lastSegmentEntryId) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
+        int fromIndex = Math.toIntExact(firstSegmentEntryId - 1);
+        int toIndex = Math.toIntExact(lastSegmentEntryId);
+        if (fromIndex < 0 || fromIndex >= segments.size()) {
+            return CompletableFuture.failedFuture(
+                    new IllegalArgumentException("Invalid first segment entry 
id: " + firstSegmentEntryId));
+        }
+        toIndex = Math.min(toIndex, segments.size());
+        return CompletableFuture.completedFuture(new 
ArrayList<>(segments.subList(fromIndex, toIndex)));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return 
CompletableFuture.completedFuture(snapshotLengths.getOrDefault(bucketId, 0L));
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        snapshots.remove(bucketId);
+        snapshotSegments.remove(bucketId);
+        snapshotLengths.remove(bucketId);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public void start() throws Exception {
+        // No-op
+    }
+
+    @Override
+    public void close() throws Exception {
+        snapshots.clear();
+        snapshotSegments.clear();
+        snapshotLengths.clear();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
index bec5134c4f7..47753335db8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
@@ -29,7 +29,7 @@ import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherM
 @Slf4j
 public abstract class AbstractDelayedDeliveryTracker implements 
DelayedDeliveryTracker, TimerTask {
 
-    protected final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+    protected final DelayedDeliveryContext context;
 
     // Reference to the shared (per-broker) timer for delayed delivery
     protected final Timer timer;
@@ -48,24 +48,39 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
     protected final Clock clock;
 
     private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final Object triggerLock;
 
     public 
AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                           long tickTimeMillis,
                                           boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict);
+        this(new DispatcherDelayedDeliveryContext(dispatcher), timer, 
tickTimeMillis,
+                Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     public 
AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                           long tickTimeMillis, Clock clock,
                                           boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
-        this.dispatcher = dispatcher;
+        this(new DispatcherDelayedDeliveryContext(dispatcher), timer, 
tickTimeMillis,
+                clock, isDelayedDeliveryDeliverAtTimeStrict);
+    }
+
+    public AbstractDelayedDeliveryTracker(DelayedDeliveryContext context, 
Timer timer,
+                                          long tickTimeMillis,
+                                          boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
+        this(context, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict);
+    }
+
+    public AbstractDelayedDeliveryTracker(DelayedDeliveryContext context, 
Timer timer,
+                                          long tickTimeMillis, Clock clock,
+                                          boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
+        this.context = context;
+        this.triggerLock = context.getTriggerLock();
         this.timer = timer;
         this.tickTimeMillis = tickTimeMillis;
         this.clock = clock;
         this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
     }
 
-
     /**
      * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow 
for early delivery by as much as the
      * {@link #tickTimeMillis} because it is a slight optimization to let 
messages skip going back into the delay
@@ -124,7 +139,7 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
         long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), 
calculatedDelayMillis);
+            log.debug("[{}] Start timer in {} millis", context.getName(), 
calculatedDelayMillis);
         }
 
         // Even though we may delay longer than this timestamp because of the 
tick delay, we still track the
@@ -136,17 +151,17 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
     @Override
     public void run(Timeout timeout) throws Exception {
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Timer triggered", dispatcher.getName());
+            log.debug("[{}] Timer triggered", context.getName());
         }
         if (timeout == null || timeout.isCancelled()) {
             return;
         }
 
-        synchronized (dispatcher) {
+        synchronized (triggerLock) {
             lastTickRun = clock.millis();
             currentTimeoutTarget = -1;
             this.timeout = null;
-            dispatcher.readMoreEntriesAsync();
+            context.triggerReadMoreEntries();
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryContext.java
new file mode 100644
index 00000000000..a94d5258f19
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+
+public interface DelayedDeliveryContext {
+
+    String getName();
+
+    ManagedCursor getCursor();
+
+    Object getTriggerLock();
+
+    void triggerReadMoreEntries();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java
new file mode 100644
index 00000000000..2ea388d504c
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+
+public class DispatcherDelayedDeliveryContext implements 
DelayedDeliveryContext {
+
+    private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+
+    public 
DispatcherDelayedDeliveryContext(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    @Override
+    public String getName() {
+        return dispatcher.getName();
+    }
+
+    @Override
+    public ManagedCursor getCursor() {
+        return dispatcher.getCursor();
+    }
+
+    @Override
+    public Object getTriggerLock() {
+        return dispatcher;
+    }
+
+    @Override
+    public void triggerReadMoreEntries() {
+        dispatcher.readMoreEntriesAsync();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 9c58f0a228a..02be50d7e9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -68,15 +68,24 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                                    long tickTimeMillis,
                                    boolean 
isDelayedDeliveryDeliverAtTimeStrict,
                                    long fixedDelayDetectionLookahead) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
-                fixedDelayDetectionLookahead);
+        this(new DispatcherDelayedDeliveryContext(dispatcher), timer, 
tickTimeMillis, Clock.systemUTC(),
+                isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
     }
 
+    @VisibleForTesting
     public 
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                           long tickTimeMillis, Clock clock,
                                           boolean 
isDelayedDeliveryDeliverAtTimeStrict,
                                           long fixedDelayDetectionLookahead) {
-        super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
+        this(new DispatcherDelayedDeliveryContext(dispatcher), timer, 
tickTimeMillis, clock,
+                isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
+    }
+
+    private InMemoryDelayedDeliveryTracker(DelayedDeliveryContext context, 
Timer timer,
+                                           long tickTimeMillis, Clock clock,
+                                           boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                           long fixedDelayDetectionLookahead) {
+        super(context, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
         this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
     }
@@ -117,7 +126,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
dispatcher.getName(), ledgerId, entryId,
+            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
context.getName(), ledgerId, entryId,
                     deliverAt - clock.millis());
         }
 
@@ -209,7 +218,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Get scheduled messages - found {}", 
dispatcher.getName(), positions.size());
+            log.debug("[{}] Get scheduled messages - found {}", 
context.getName(), positions.size());
         }
 
         if (delayedMessageMap.isEmpty()) {
@@ -218,7 +227,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
             messagesHaveFixedDelay = true;
             if (delayedMessagesCount.get() != 0) {
                 log.warn("[{}] Delayed message tracker is empty, but 
delayedMessagesCount is {}",
-                        dispatcher.getName(), delayedMessagesCount.get());
+                        context.getName(), delayedMessagesCount.get());
             }
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
new file mode 100644
index 00000000000..826e127df00
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+
+public class NoopDelayedDeliveryContext implements DelayedDeliveryContext {
+
+    private final String name;
+    private final ManagedCursor cursor;
+    private final Object triggerLock = new Object();
+
+    public NoopDelayedDeliveryContext(String name, ManagedCursor cursor) {
+        this.name = name;
+        this.cursor = cursor;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public ManagedCursor getCursor() {
+        return cursor;
+    }
+
+    @Override
+    public Object getTriggerLock() {
+        return triggerLock;
+    }
+
+    @Override
+    public void triggerReadMoreEntries() {
+        // no-op; for tests/JMH
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 91964f0eb90..d963f60311c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -55,6 +55,8 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryContext;
+import org.apache.pulsar.broker.delayed.DispatcherDelayedDeliveryContext;
 import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
 import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
@@ -119,9 +121,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                         long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
                                         int 
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
             throws RecoverDelayedDeliveryTrackerException {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
-                bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegmentInMillis,
-                maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
+        this(new DispatcherDelayedDeliveryContext(dispatcher), timer, 
tickTimeMillis, Clock.systemUTC(),
+                isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, 
minIndexCountPerBucket,
+                timeStepPerBucketSnapshotSegmentInMillis, 
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
     }
 
     public 
BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher,
@@ -131,7 +133,20 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                         long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
                                         int 
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
             throws RecoverDelayedDeliveryTrackerException {
-        super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
+        this(new DispatcherDelayedDeliveryContext(dispatcher), timer, 
tickTimeMillis, clock,
+                isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, 
minIndexCountPerBucket,
+                timeStepPerBucketSnapshotSegmentInMillis, 
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
+    }
+
+    @VisibleForTesting
+    public BucketDelayedDeliveryTracker(DelayedDeliveryContext context,
+                                        Timer timer, long tickTimeMillis, 
Clock clock,
+                                        boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                        BucketSnapshotStorage 
bucketSnapshotStorage,
+                                        long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
+                                        int 
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+            throws RecoverDelayedDeliveryTrackerException {
+        super(context, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.minIndexCountPerBucket = minIndexCountPerBucket;
         this.timeStepPerBucketSnapshotSegmentInMillis = 
timeStepPerBucketSnapshotSegmentInMillis;
         this.maxIndexesPerBucketSnapshotSegment = 
maxIndexesPerBucketSnapshotSegment;
@@ -140,7 +155,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         this.immutableBuckets = TreeRangeMap.create();
         this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>();
         this.lastMutableBucket =
-                new MutableBucket(dispatcher.getName(), 
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
+                new MutableBucket(context.getName(), context.getCursor(), 
FutureUtil.Sequencer.create(),
                         bucketSnapshotStorage);
         this.stats = new BucketDelayedMessageIndexStats();
 
@@ -159,7 +174,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         Map<String, String> cursorProperties = cursor.getCursorProperties();
         if (MapUtils.isEmpty(cursorProperties)) {
             log.info("[{}] Recover delayed message index bucket snapshot 
finish, don't find bucket snapshot",
-                    dispatcher.getName());
+                    context.getName());
             return 0;
         }
         FutureUtil.Sequencer<Void> sequencer = 
this.lastMutableBucket.getSequencer();
@@ -169,7 +184,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 String[] keys = key.split(DELIMITER);
                 checkArgument(keys.length == 3);
                 ImmutableBucket immutableBucket =
-                        new ImmutableBucket(dispatcher.getName(), cursor, 
sequencer,
+                        new ImmutableBucket(context.getName(), cursor, 
sequencer,
                                 this.lastMutableBucket.bucketSnapshotStorage,
                                 Long.parseLong(keys[1]), 
Long.parseLong(keys[2]));
                 
putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId),
@@ -180,7 +195,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         Map<Range<Long>, ImmutableBucket> immutableBucketMap = 
immutableBuckets.asMapOfRanges();
         if (immutableBucketMap.isEmpty()) {
             log.info("[{}] Recover delayed message index bucket snapshot 
finish, don't find bucket snapshot",
-                    dispatcher.getName());
+                    context.getName());
             return 0;
         }
 
@@ -194,7 +209,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         try {
             
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 5, 
TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            log.error("[{}] Failed to recover delayed message index bucket 
snapshot.", dispatcher.getName(), e);
+            log.error("[{}] Failed to recover delayed message index bucket 
snapshot.", context.getName(), e);
             if (e instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
             }
@@ -235,7 +250,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         });
 
         log.info("[{}] Recover delayed message index bucket snapshot finish, 
buckets: {}, numberDelayedMessages: {}",
-                dispatcher.getName(), immutableBucketMap.size(), 
numberDelayedMessages.longValue());
+                context.getName(), immutableBucketMap.size(), 
numberDelayedMessages.longValue());
 
         return numberDelayedMessages.longValue();
     }
@@ -323,7 +338,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     if (ex == null) {
                         immutableBucket.setSnapshotSegments(null);
                         immutableBucket.asyncUpdateSnapshotLength();
-                        log.info("[{}] Create bucket snapshot finish, 
bucketKey: {}", dispatcher.getName(),
+                        log.info("[{}] Create bucket snapshot finish, 
bucketKey: {}", context.getName(),
                                 immutableBucket.bucketKey());
 
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
@@ -332,7 +347,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                         return bucketId;
                     }
 
-                    log.error("[{}] Failed to create bucket snapshot, 
bucketKey: {}", dispatcher.getName(),
+                    log.error("[{}] Failed to create bucket snapshot, 
bucketKey: {}", context.getName(),
                             immutableBucket.bucketKey(), ex);
                     
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
 
@@ -404,7 +419,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         numberDelayedMessages.incrementAndGet();
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
dispatcher.getName(), ledgerId, entryId,
+            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
context.getName(), ledgerId, entryId,
                     deliverAt - clock.millis());
         }
 
@@ -455,14 +470,14 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         List<ImmutableBucket> toBeMergeImmutableBuckets = 
selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM);
 
         if (toBeMergeImmutableBuckets.isEmpty()) {
-            log.warn("[{}] Can't find able merged buckets", 
dispatcher.getName());
+            log.warn("[{}] Can't find able merged buckets", context.getName());
             return CompletableFuture.completedFuture(null);
         }
 
         final String bucketsStr = 
toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect(
                 Collectors.joining(",")).replaceAll(DELAYED_BUCKET_KEY_PREFIX 
+ "_", "");
         if (log.isDebugEnabled()) {
-            log.info("[{}] Merging bucket snapshot, bucketKeys: {}", 
dispatcher.getName(), bucketsStr);
+            log.info("[{}] Merging bucket snapshot, bucketKeys: {}", 
context.getName(), bucketsStr);
         }
 
         for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
@@ -479,12 +494,12 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             }
             if (ex != null) {
                 log.error("[{}] Failed to merge bucket snapshot, bucketKeys: 
{}",
-                        dispatcher.getName(), bucketsStr, ex);
+                        context.getName(), bucketsStr, ex);
 
                 
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
             } else {
                 log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, 
bucketNum: {}",
-                        dispatcher.getName(), bucketsStr, 
immutableBuckets.asMapOfRanges().size());
+                        context.getName(), bucketsStr, 
immutableBuckets.asMapOfRanges().size());
 
                 
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
                         System.currentTimeMillis() - mergeStartTime);
@@ -599,7 +614,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         if (!checkPendingLoadDone()) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Skip getScheduledMessages to wait for bucket 
snapshot load finish.",
-                        dispatcher.getName());
+                        context.getName());
             }
             return Collections.emptyNavigableSet();
         }
@@ -627,19 +642,19 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 // All message of current snapshot segment are scheduled, try 
load next snapshot segment
                 if (bucket.merging) {
                     log.info("[{}] Skip load to wait for bucket snapshot merge 
finish, bucketKey:{}",
-                            dispatcher.getName(), bucket.bucketKey());
+                            context.getName(), bucket.bucketKey());
                     break;
                 }
 
                 final int preSegmentEntryId = bucket.currentSegmentEntryId;
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Loading next bucket snapshot segment, 
bucketKey: {}, nextSegmentEntryId: {}",
-                            dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1);
+                            context.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1);
                 }
                 boolean createFutureDone = 
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
                 if (!createFutureDone) {
                     log.info("[{}] Skip load to wait for bucket snapshot 
create finish, bucketKey:{}",
-                            dispatcher.getName(), bucket.bucketKey());
+                            context.getName(), bucket.bucketKey());
                     break;
                 }
 
@@ -671,12 +686,12 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                         bucket.setCurrentSegmentEntryId(preSegmentEntryId);
 
                         log.error("[{}] Failed to load bucket snapshot 
segment, bucketKey: {}, segmentEntryId: {}",
-                                dispatcher.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1, ex);
+                                context.getName(), bucket.bucketKey(), 
preSegmentEntryId + 1, ex);
 
                         
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
                     } else {
                         log.info("[{}] Load next bucket snapshot segment 
finish, bucketKey: {}, segmentEntryId: {}",
-                                dispatcher.getName(), bucket.bucketKey(),
+                                context.getName(), bucket.bucketKey(),
                                 (preSegmentEntryId == 
bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1);
 
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
@@ -742,7 +757,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     .map(bucket -> 
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
             
FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
         } catch (Exception e) {
-            log.warn("[{}] Failed wait to snapshot generate", 
dispatcher.getName(), e);
+            log.warn("[{}] Failed wait to snapshot generate", 
context.getName(), e);
         }
     }
 

Reply via email to