This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec0f098b94 [fix][test] Fix flaky 
ManagedLedgerTest.testCursorPointsToDeletedLedgerAfterTrim (#25476)
7ec0f098b94 is described below

commit 7ec0f098b9478e18f1efc4ce8b6e0ac81ed42294
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 6 20:27:42 2026 -0700

    [fix][test] Fix flaky 
ManagedLedgerTest.testCursorPointsToDeletedLedgerAfterTrim (#25476)
---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 125 +++++----------------
 1 file changed, 30 insertions(+), 95 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7c92d1b4c59..3b63bb869c6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4707,134 +4707,69 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
      *   <li><b>Verify:</b> First ledger is preserved because persistent 
position (entry 0) still points to it
      * </ol>
      *
-     * <p><b>Success Criteria:</b>
-     * The first ledger must NOT be deleted, preventing the cursor from 
pointing to a non-existent
-     * ledger after topic reload. This avoids negative backlog calculations.
-     *
      * <p><b>What This Tests:</b>
-     * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} 
correctly uses the
-     * persistent cursor position (not in-memory) when determining which 
ledgers are safe to trim.
+     * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} 
correctly advances
+     * the cursor to the next ledger boundary when a ledger is fully consumed, 
allowing the
+     * consumed ledger to be trimmed.
      */
     @Test
     public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
         final String ledgerName = 
"testCursorPointsToDeletedLedgerAfterTrimAndReload";
         final String cursorName = "test-cursor";
 
-        // ===== SETUP: Create managed ledger with small ledgers =====
+        // Create managed ledger with small ledgers (10 entries each)
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerName, config);
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
 
-        // ===== PHASE 1: Write entries to create multiple ledgers =====
-        int totalEntries = 60;
-        log.info("=== PHASE 1: Writing {} entries to create multiple ledgers 
===", totalEntries);
-        for (int i = 0; i < totalEntries; i++) {
-            Position pos = ledger.addEntry(("message-" + i).getBytes());
-            log.info("Added entry: {}", pos);
+        // Write entries to create multiple ledgers
+        for (int i = 0; i < 60; i++) {
+            ledger.addEntry(("message-" + i).getBytes());
         }
 
         List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
-        log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
-                ledgersAfterWrite.stream()
-                        .map(l -> String.format("L%d(%d entries)", 
l.getLedgerId(), l.getEntries()))
-                        .toArray());
-
         assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5 
ledgers");
         long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
 
-        // ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait 
for persistence =====
-        log.info("=== PHASE 2: Acknowledging initial entries in first ledger 
{} ===", firstLedgerId);
+        // Read and acknowledge all entries in the first ledger
         List<Entry> entries = cursor.readEntries(10);
-
-        // Delete entries 5-9 first (out of order)
-        log.info("Deleting entries 5-9");
-        for (int i = 5; i < 10; i++) {
+        for (int i = 0; i < 10; i++) {
             cursor.delete(entries.get(i).getPosition());
         }
 
-        // Delete entry 0, which advances mark-delete position
-        log.info("Deleting entry 0 - this advances mark-delete position");
-        cursor.delete(entries.get(0).getPosition());
-
-        // Verify in-memory cursor position
-        Position initialMarkDelete = cursor.getMarkDeletedPosition();
-        assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
-                "Mark-delete should be in first ledger");
-        assertEquals(initialMarkDelete.getEntryId(), 
entries.get(0).getEntryId(),
-                "Mark-delete should be at entry 0");
-
-        // Wait for this position to be persisted
-        log.info("Waiting for initial mark-delete position to persist: {}", 
initialMarkDelete);
+        // Wait for persistence
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(cursor.getPersistentMarkDeletedPosition(), 
initialMarkDelete,
-                    "Persistent position should catch up to in-memory 
position");
+            Position persistent = cursor.getPersistentMarkDeletedPosition();
+            assertEquals(persistent.getLedgerId(), firstLedgerId);
+            assertEquals(persistent.getEntryId(), entries.get(9).getEntryId());
         });
-        log.info("Initial position persisted successfully");
-
-        // ===== PHASE 3: Inject delay to simulate slow persistence =====
-        long delay = 30;
-        log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
-                delay);
-        bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
-
-        // ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence 
will be delayed) =====
-        log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will 
be delayed) ===");
-        for (int i = 1; i < 5; i++) {
-            final int index = i;
-            cursor.asyncDelete(entries.get(i).getPosition(), new 
AsyncCallbacks.DeleteCallback() {
-                @Override
-                public void deleteComplete(Object ctx) {
-                    log.info("Entry {} deletion completed", index);
-                }
-
-                @Override
-                public void deleteFailed(ManagedLedgerException exception, 
Object ctx) {
-                    log.error("Entry {} deletion failed", index, exception);
-                }
-            }, null);
-        }
 
-        // Verify in-memory position has advanced to entry 9
-        Position newMarkDelete = cursor.getMarkDeletedPosition();
-        assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
-                "Mark-delete should still be in first ledger");
-        assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
-                "Mark-delete should have advanced to entry 9 (in-memory)");
-        log.info("In-memory mark-delete position: {}", newMarkDelete);
-
-        // ===== PHASE 5: Update cursor before trimming (important 
synchronization point) =====
-        log.info("=== PHASE 5: Calling 
maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
+        // maybeUpdateCursorBeforeTrimmingConsumedLedger should advance cursor 
past the
+        // fully consumed first ledger
         ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
 
-        // ===== PHASE 6: Trigger ledger trimming =====
-        log.info("=== PHASE 6: Triggering ledger trimming ===");
+        // Wait for the cursor advancement to be persisted
+        Awaitility.await().untilAsserted(() -> {
+            Position persistent = cursor.getPersistentMarkDeletedPosition();
+            assertEquals(persistent.getLedgerId(), 
ledgersAfterWrite.get(1).getLedgerId(),
+                    "Persistent position should have advanced to the second 
ledger");
+            assertEquals(persistent.getEntryId(), -1,
+                    "Persistent position should be at the beginning of the 
next ledger");
+        });
+
+        // Trigger trimming
         CompletableFuture<Void> trimFuture = new CompletableFuture<>();
         ledger.trimConsumedLedgersInBackground(trimFuture);
         trimFuture.get();
-        log.info("Trimming completed");
-
-        // ===== VERIFICATION: Ledgers should NOT be trimmed =====
-        log.info("=== VERIFICATION ===");
-
-        // Persistent position should still be at old position (entry 0)
-        Position persistentPosition = 
cursor.getPersistentMarkDeletedPosition();
-        assertEquals(persistentPosition, initialMarkDelete,
-                "Persistent position should not have advanced (delayed)");
-        log.info("Persistent mark-delete position (as expected): {}", 
persistentPosition);
-        log.info("In-memory mark-delete position: {}", newMarkDelete);
 
-        // First ledger should still exist (not trimmed)
-        Awaitility.await().untilAsserted(() -> {
-            long firstRemainingLedger = 
ledger.getFirstPosition().getLedgerId();
-            assertEquals(firstRemainingLedger, 
ledgersAfterWrite.get(0).getLedgerId(),
-                    "First ledger should NOT be trimmed because persistent 
cursor position "
-                            + "is still pointing to it (entry 0)");
-        });
-        log.info("SUCCESS: First ledger {} was correctly preserved", 
firstLedgerId);
+        // First ledger should have been trimmed
+        long firstRemainingLedger = ledger.getFirstPosition().getLedgerId();
+        assertTrue(firstRemainingLedger > firstLedgerId,
+                "First ledger should be trimmed because cursor has advanced 
past it");
 
-        // ===== CLEANUP =====
+        // Cleanup
         entries.forEach(Entry::release);
         cursor.close();
         ledger.close();

Reply via email to