This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec0f098b94 [fix][test] Fix flaky
ManagedLedgerTest.testCursorPointsToDeletedLedgerAfterTrim (#25476)
7ec0f098b94 is described below
commit 7ec0f098b9478e18f1efc4ce8b6e0ac81ed42294
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 6 20:27:42 2026 -0700
[fix][test] Fix flaky
ManagedLedgerTest.testCursorPointsToDeletedLedgerAfterTrim (#25476)
---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 125 +++++----------------
1 file changed, 30 insertions(+), 95 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7c92d1b4c59..3b63bb869c6 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4707,134 +4707,69 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
* <li><b>Verify:</b> First ledger is preserved because persistent
position (entry 0) still points to it
* </ol>
*
- * <p><b>Success Criteria:</b>
- * The first ledger must NOT be deleted, preventing the cursor from
pointing to a non-existent
- * ledger after topic reload. This avoids negative backlog calculations.
- *
* <p><b>What This Tests:</b>
- * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()}
correctly uses the
- * persistent cursor position (not in-memory) when determining which
ledgers are safe to trim.
+ * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()}
correctly advances
+ * the cursor to the next ledger boundary when a ledger is fully consumed,
allowing the
+ * consumed ledger to be trimmed.
*/
@Test
public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
final String ledgerName =
"testCursorPointsToDeletedLedgerAfterTrimAndReload";
final String cursorName = "test-cursor";
- // ===== SETUP: Create managed ledger with small ledgers =====
+ // Create managed ledger with small ledgers (10 entries each)
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(cursorName);
- // ===== PHASE 1: Write entries to create multiple ledgers =====
- int totalEntries = 60;
- log.info("=== PHASE 1: Writing {} entries to create multiple ledgers
===", totalEntries);
- for (int i = 0; i < totalEntries; i++) {
- Position pos = ledger.addEntry(("message-" + i).getBytes());
- log.info("Added entry: {}", pos);
+ // Write entries to create multiple ledgers
+ for (int i = 0; i < 60; i++) {
+ ledger.addEntry(("message-" + i).getBytes());
}
List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
- log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
- ledgersAfterWrite.stream()
- .map(l -> String.format("L%d(%d entries)",
l.getLedgerId(), l.getEntries()))
- .toArray());
-
assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5
ledgers");
long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
- // ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait
for persistence =====
- log.info("=== PHASE 2: Acknowledging initial entries in first ledger
{} ===", firstLedgerId);
+ // Read and acknowledge all entries in the first ledger
List<Entry> entries = cursor.readEntries(10);
-
- // Delete entries 5-9 first (out of order)
- log.info("Deleting entries 5-9");
- for (int i = 5; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
cursor.delete(entries.get(i).getPosition());
}
- // Delete entry 0, which advances mark-delete position
- log.info("Deleting entry 0 - this advances mark-delete position");
- cursor.delete(entries.get(0).getPosition());
-
- // Verify in-memory cursor position
- Position initialMarkDelete = cursor.getMarkDeletedPosition();
- assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
- "Mark-delete should be in first ledger");
- assertEquals(initialMarkDelete.getEntryId(),
entries.get(0).getEntryId(),
- "Mark-delete should be at entry 0");
-
- // Wait for this position to be persisted
- log.info("Waiting for initial mark-delete position to persist: {}",
initialMarkDelete);
+ // Wait for persistence
Awaitility.await().untilAsserted(() -> {
- assertEquals(cursor.getPersistentMarkDeletedPosition(),
initialMarkDelete,
- "Persistent position should catch up to in-memory
position");
+ Position persistent = cursor.getPersistentMarkDeletedPosition();
+ assertEquals(persistent.getLedgerId(), firstLedgerId);
+ assertEquals(persistent.getEntryId(), entries.get(9).getEntryId());
});
- log.info("Initial position persisted successfully");
-
- // ===== PHASE 3: Inject delay to simulate slow persistence =====
- long delay = 30;
- log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
- delay);
- bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
-
- // ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence
will be delayed) =====
- log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will
be delayed) ===");
- for (int i = 1; i < 5; i++) {
- final int index = i;
- cursor.asyncDelete(entries.get(i).getPosition(), new
AsyncCallbacks.DeleteCallback() {
- @Override
- public void deleteComplete(Object ctx) {
- log.info("Entry {} deletion completed", index);
- }
-
- @Override
- public void deleteFailed(ManagedLedgerException exception,
Object ctx) {
- log.error("Entry {} deletion failed", index, exception);
- }
- }, null);
- }
- // Verify in-memory position has advanced to entry 9
- Position newMarkDelete = cursor.getMarkDeletedPosition();
- assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
- "Mark-delete should still be in first ledger");
- assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
- "Mark-delete should have advanced to entry 9 (in-memory)");
- log.info("In-memory mark-delete position: {}", newMarkDelete);
-
- // ===== PHASE 5: Update cursor before trimming (important
synchronization point) =====
- log.info("=== PHASE 5: Calling
maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
+ // maybeUpdateCursorBeforeTrimmingConsumedLedger should advance cursor
past the
+ // fully consumed first ledger
ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
- // ===== PHASE 6: Trigger ledger trimming =====
- log.info("=== PHASE 6: Triggering ledger trimming ===");
+ // Wait for the cursor advancement to be persisted
+ Awaitility.await().untilAsserted(() -> {
+ Position persistent = cursor.getPersistentMarkDeletedPosition();
+ assertEquals(persistent.getLedgerId(),
ledgersAfterWrite.get(1).getLedgerId(),
+ "Persistent position should have advanced to the second
ledger");
+ assertEquals(persistent.getEntryId(), -1,
+ "Persistent position should be at the beginning of the
next ledger");
+ });
+
+ // Trigger trimming
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(trimFuture);
trimFuture.get();
- log.info("Trimming completed");
-
- // ===== VERIFICATION: Ledgers should NOT be trimmed =====
- log.info("=== VERIFICATION ===");
-
- // Persistent position should still be at old position (entry 0)
- Position persistentPosition =
cursor.getPersistentMarkDeletedPosition();
- assertEquals(persistentPosition, initialMarkDelete,
- "Persistent position should not have advanced (delayed)");
- log.info("Persistent mark-delete position (as expected): {}",
persistentPosition);
- log.info("In-memory mark-delete position: {}", newMarkDelete);
- // First ledger should still exist (not trimmed)
- Awaitility.await().untilAsserted(() -> {
- long firstRemainingLedger =
ledger.getFirstPosition().getLedgerId();
- assertEquals(firstRemainingLedger,
ledgersAfterWrite.get(0).getLedgerId(),
- "First ledger should NOT be trimmed because persistent
cursor position "
- + "is still pointing to it (entry 0)");
- });
- log.info("SUCCESS: First ledger {} was correctly preserved",
firstLedgerId);
+ // First ledger should have been trimmed
+ long firstRemainingLedger = ledger.getFirstPosition().getLedgerId();
+ assertTrue(firstRemainingLedger > firstLedgerId,
+ "First ledger should be trimmed because cursor has advanced
past it");
- // ===== CLEANUP =====
+ // Cleanup
entries.forEach(Entry::release);
cursor.close();
ledger.close();