This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a303ac9d561ccd068d1efa3a06ceafd2e80f1ee8 Author: Lari Hotari <[email protected]> AuthorDate: Fri Jun 12 18:07:39 2026 +0300 [fix][test][branch-4.0] Backport configurable read/add delays in PulsarMockBookKeeper Partial cherry-pick of the testmocks changes from 490ba0cca18 ([improve][broker] Implement PIP-430 Pulsar Broker cache improvements (#24623)), without the JFR read event interceptor parts which aren't needed on branch-4.0. Required so that CompactionTest compiles after cherry-picking ded1e42d352 (#25998), which uses PulsarMockBookKeeper.setDefaultReadEntriesDelayMillis. --- .../bookkeeper/client/PulsarMockBookKeeper.java | 30 ++++++++++++++++++ .../bookkeeper/client/PulsarMockLedgerHandle.java | 36 +++++++++++++--------- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 7104ded7460..7bebc0e558f 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -75,6 +75,8 @@ public class PulsarMockBookKeeper extends BookKeeper { final OrderedExecutor orderedExecutor; final ExecutorService executor; final ScheduledExecutorService scheduler; + private volatile long defaultAddEntryDelayMillis = 1L; + private volatile long defaultReadEntriesDelayMillis = 1L; @Override public ClientConfiguration getConf() { @@ -492,5 +494,33 @@ public class PulsarMockBookKeeper extends BookKeeper { return metadataClientDriver; } + public long getReadEntriesDelayMillis() { + return defaultReadEntriesDelayMillis; + } + + public long getNextAddEntryDelayMillis() { + Long delay = addEntryDelaysMillis.poll(); + if (delay != null) { + return delay; + } + return defaultAddEntryDelayMillis; + } + + public long getNextAddEntryResponseDelayMillis() { + Long delay = addEntryResponseDelaysMillis.poll(); + if (delay != null) { + return delay; + } + return 0; + } + + public void setDefaultAddEntryDelayMillis(long defaultAddEntryDelayMillis) { + this.defaultAddEntryDelayMillis = defaultAddEntryDelayMillis; + } + + public void setDefaultReadEntriesDelayMillis(long defaultReadEntriesDelayMillis) { + this.defaultReadEntriesDelayMillis = defaultReadEntriesDelayMillis; + } + private static final Logger log = LoggerFactory.getLogger(PulsarMockBookKeeper.class); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 4d1fd1380c8..a5403774133 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -113,18 +113,26 @@ public class PulsarMockLedgerHandle extends LedgerHandle { @Override public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) { bk.getProgrammedFailure().thenComposeAsync((res) -> { - log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + if (log.isDebugEnabled()) { + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + } final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>(); long entryId = firstEntry; while (entryId <= lastEntry && entryId < entries.size()) { seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate())); } - log.debug("Entries read: {}", seq); + if (log.isDebugEnabled()) { + log.debug("Entries read: {}", seq); + } - try { - Thread.sleep(1); - } catch (InterruptedException e) { + long readEntriesDelay = bk.getReadEntriesDelayMillis(); + if (readEntriesDelay > 0) { + try { + Thread.sleep(readEntriesDelay); + } catch (InterruptedException e) { + // ignore + } } Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() { @@ -182,14 +190,12 @@ public class PulsarMockLedgerHandle extends LedgerHandle { @Override public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) { bk.getAddEntryFailure().thenComposeAsync((res) -> { - Long delayMillis = bk.addEntryDelaysMillis.poll(); - if (delayMillis == null) { - delayMillis = 1L; - } - - try { - Thread.sleep(delayMillis); - } catch (InterruptedException e) { + long delayMillis = bk.getNextAddEntryDelayMillis(); + if (delayMillis > 0) { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException e) { + } } if (fenced) { @@ -211,8 +217,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle { cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception), PulsarMockLedgerHandle.this, LedgerHandle.INVALID_ENTRY_ID, ctx); } else { - Long responseDelayMillis = bk.addEntryResponseDelaysMillis.poll(); - if (responseDelayMillis != null) { + long responseDelayMillis = bk.getNextAddEntryResponseDelayMillis(); + if (responseDelayMillis > 0) { try { Thread.sleep(responseDelayMillis); } catch (InterruptedException e) {
