This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d553cec5485 [improve][ml] Warn and emit metric when cursor ack state 
exceeds persist limits (#25548)
d553cec5485 is described below

commit d553cec5485bdaa359b5bfdaea0162f10d3c13e5
Author: Alexandre Boyer <[email protected]>
AuthorDate: Tue Apr 21 14:10:24 2026 +0200

    [improve][ml] Warn and emit metric when cursor ack state exceeds persist 
limits (#25548)
---
 conf/broker.conf                                   |   4 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  58 ++++++++-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   1 +
 .../impl/OpenTelemetryManagedCursorStats.java      |  35 +++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 144 +++++++++++++++++++++
 5 files changed, 240 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0447b3e05ac..4d6bb9162bf 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1424,6 +1424,8 @@ managedLedgerMaxReadsInFlightSizeInMB=0
 # that were acknowledged. After the max number of ranges is reached, the 
information
 # will only be tracked in memory and messages will be redelivered in case of
 # crashes.
+# Truncations emit a WARN log and increment
+# pulsar.broker.managed_ledger.cursor.persist.unacked_ranges.truncated.
 managedLedgerMaxUnackedRangesToPersist=10000
 
 # Maximum number of partially acknowledged batch messages per subscription 
that will have their batch
@@ -1431,6 +1433,8 @@ managedLedgerMaxUnackedRangesToPersist=10000
 # When this limit is exceeded, remaining batch message containing the batch 
deleted indexes will
 # only be tracked in memory. In case of broker restarts or load balancing 
events, the batch
 # deleted indexes will be cleared while redelivering the messages to consumers.
+# Truncations emit a WARN log and increment
+# pulsar.broker.managed_ledger.cursor.persist.batch_deleted_indexes.truncated.
 managedLedgerMaxBatchDeletedIndexToPersist=10000
 
 # When storing acknowledgement state, choose a more compact serialization 
format that stores
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5eb81c9e691..c42ccf69f86 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -234,6 +234,10 @@ public class ManagedCursorImpl implements ManagedCursor {
     // active state cache in ManagedCursor. It should be in sync with the 
state in activeCursors in ManagedLedger.
     private volatile boolean isActive = false;
 
+    // Emit the truncation WARN logs exactly once per crossing.
+    private final AtomicBoolean lastCursorDataFullyPersistable = new 
AtomicBoolean(true);
+    private final AtomicBoolean lastBatchDeletedIndexFullyPersistable = new 
AtomicBoolean(true);
+
     // This is a lock used to update the registration state of the cursor in 
the managed ledger.
     private final Object registerToWaitingCursorsLock = new Object();
     // This is used to track if the cursor is registered in the managed 
ledger's waitingCursors queue
@@ -3375,8 +3379,14 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             AtomicInteger acksSerializedSize = new AtomicInteger(0);
             List<MessageRange> rangeList = new ArrayList<>();
+            final int maxRanges = getConfig().getMaxUnackedRangesToPersist();
+            final MutableBoolean truncated = new MutableBoolean(false);
 
             individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, 
upperKey, upperValue) -> {
+                if (rangeList.size() >= maxRanges) {
+                    truncated.setTrue();
+                    return false;
+                }
                 MessageRange messageRange = new MessageRange();
                 messageRange.setLowerEndpoint()
                         .setLedgerId(lowerKey)
@@ -3388,11 +3398,33 @@ public class ManagedCursorImpl implements ManagedCursor 
{
                 acksSerializedSize.addAndGet(messageRange.getSerializedSize());
                 rangeList.add(messageRange);
 
-                return rangeList.size() <= 
getConfig().getMaxUnackedRangesToPersist();
+                return true;
             });
 
             this.individualDeletedMessagesSerializedSize = 
acksSerializedSize.get();
             individualDeletedMessages.resetDirtyKeys();
+
+            if (truncated.booleanValue()) {
+                ledger.getFactory().getOpenTelemetryManagedCursorStats()
+                        .incrementPersistUnackedRangesTruncated(this);
+                if (lastCursorDataFullyPersistable.compareAndSet(true, false)) 
{
+                    int totalRanges = individualDeletedMessages.size();
+                    log.warn()
+                        .attr("totalRanges", totalRanges)
+                        .attr("maxRanges", maxRanges)
+                        .attr("truncated", totalRanges - rangeList.size())
+                        .log("Individually deleted message ranges exceed"
+                            + " managedLedgerMaxUnackedRangesToPersist."
+                            + " Acknowledged messages beyond this limit are 
not persisted"
+                            + " and will be replayed on broker restart."
+                            + " Consider raising 
managedLedgerMaxUnackedRangesToPersist,"
+                            + " verifying 
managedLedgerPersistIndividualAckAsLongArray=true (the default),"
+                            + " and setting 
managedCursorInfoCompressionType=LZ4 to reduce the persisted size.");
+                }
+            } else {
+                lastCursorDataFullyPersistable.compareAndSet(false, true);
+            }
+
             return rangeList;
         } finally {
             lock.writeLock().unlock();
@@ -3407,7 +3439,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             }
             List<BatchedEntryDeletionIndexInfo> result = new ArrayList<>();
             final var iterator = batchDeletedIndexes.entrySet().iterator();
-            while (iterator.hasNext() && result.size() < 
getConfig().getMaxBatchDeletedIndexToPersist()) {
+            int maxIndexes = getConfig().getMaxBatchDeletedIndexToPersist();
+            while (iterator.hasNext() && result.size() < maxIndexes) {
                 final var entry = iterator.next();
                 BatchedEntryDeletionIndexInfo batchDeletedIndexInfo = new 
BatchedEntryDeletionIndexInfo();
                 batchDeletedIndexInfo.setPosition()
@@ -3419,6 +3452,27 @@ public class ManagedCursorImpl implements ManagedCursor {
                 }
                 result.add(batchDeletedIndexInfo);
             }
+
+            if (iterator.hasNext()) {
+                ledger.getFactory().getOpenTelemetryManagedCursorStats()
+                        .incrementPersistBatchDeletedIndexesTruncated(this);
+                if (lastBatchDeletedIndexFullyPersistable.compareAndSet(true, 
false)) {
+                    int totalIndexes = batchDeletedIndexes.size();
+                    log.warn()
+                        .attr("totalIndexes", totalIndexes)
+                        .attr("maxIndexes", maxIndexes)
+                        .attr("truncated", totalIndexes - result.size())
+                        .log("Batch deleted indexes exceed"
+                            + " managedLedgerMaxBatchDeletedIndexToPersist."
+                            + " Partially acknowledged batch messages beyond 
this limit are not persisted"
+                            + " and will be replayed on broker restart."
+                            + " Consider raising 
managedLedgerMaxBatchDeletedIndexToPersist"
+                            + " and setting 
managedCursorInfoCompressionType=LZ4 to reduce the persisted size.");
+                }
+            } else {
+                lastBatchDeletedIndexFullyPersistable.compareAndSet(false, 
true);
+            }
+
             return result;
         } finally {
             lock.readLock().unlock();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b6c7d5b0629..60e972e561b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -140,6 +140,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
     @Getter
     private final OpenTelemetryManagedLedgerStats 
openTelemetryManagedLedgerStats;
+    @Getter
     private final OpenTelemetryManagedCursorStats 
openTelemetryManagedCursorStats;
 
     //indicate whether shutdown() is called.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
index ec73c9d5e5e..cb544f3b190 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
 import com.google.common.collect.Streams;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.ObservableLongMeasurement;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -55,6 +56,16 @@ public class OpenTelemetryManagedCursorStats implements 
AutoCloseable {
     public static final String INCOMING_BYTE_COUNTER = 
"pulsar.broker.managed_ledger.cursor.incoming.size";
     private final ObservableLongMeasurement incomingByteCounter;
 
+    // Broker-level counters incremented when cursor persistence silently 
truncates ack state.
+    // See managedLedgerMaxUnackedRangesToPersist and 
managedLedgerMaxBatchDeletedIndexToPersist.
+    public static final String PERSIST_UNACKED_RANGES_TRUNCATED =
+            
"pulsar.broker.managed_ledger.cursor.persist.unacked_ranges.truncated";
+    private final LongCounter persistUnackedRangesTruncated;
+
+    public static final String PERSIST_BATCH_DELETED_INDEXES_TRUNCATED =
+            
"pulsar.broker.managed_ledger.cursor.persist.batch_deleted_indexes.truncated";
+    private final LongCounter persistBatchDeletedIndexesTruncated;
+
     private final BatchCallback batchCallback;
 
     public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry, 
ManagedLedgerFactoryImpl factory) {
@@ -96,6 +107,22 @@ public class OpenTelemetryManagedCursorStats implements 
AutoCloseable {
                 .setDescription("The total amount of data read from the 
ledger.")
                 .buildObserver();
 
+        persistUnackedRangesTruncated = meter
+                .counterBuilder(PERSIST_UNACKED_RANGES_TRUNCATED)
+                .setUnit("{truncation}")
+                .setDescription("The number of times a cursor exceeded"
+                        + " managedLedgerMaxUnackedRangesToPersist, causing 
ack state to be truncated"
+                        + " at persistence. Ack state beyond the limit is lost 
on broker restart.")
+                .build();
+
+        persistBatchDeletedIndexesTruncated = meter
+                .counterBuilder(PERSIST_BATCH_DELETED_INDEXES_TRUNCATED)
+                .setUnit("{truncation}")
+                .setDescription("The number of times a cursor exceeded"
+                        + " managedLedgerMaxBatchDeletedIndexToPersist, 
causing batch deleted index state"
+                        + " to be truncated at persistence. State beyond the 
limit is lost on broker restart.")
+                .build();
+
         batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
                         .values()
                         .stream()
@@ -115,6 +142,14 @@ public class OpenTelemetryManagedCursorStats implements 
AutoCloseable {
         batchCallback.close();
     }
 
+    public void incrementPersistUnackedRangesTruncated(ManagedCursor cursor) {
+        persistUnackedRangesTruncated.add(1, 
cursor.getManagedCursorAttributes().getAttributes());
+    }
+
+    public void incrementPersistBatchDeletedIndexesTruncated(ManagedCursor 
cursor) {
+        persistBatchDeletedIndexesTruncated.add(1, 
cursor.getManagedCursorAttributes().getAttributes());
+    }
+
     private void recordMetrics(ManagedCursor cursor) {
         var stats = cursor.getStats();
         var cursorAttributesSet = cursor.getManagedCursorAttributes();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1e9fbdb5053..a3eb4756335 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -119,6 +121,7 @@ import 
org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.PositionInfo;
 import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
 import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.collections4.iterators.EmptyIterator;
 import org.apache.commons.lang3.mutable.MutableBoolean;
@@ -6032,6 +6035,147 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(properties.get(propertyKey), lastIndex - 1);
     }
 
+    @DataProvider(name = "rangesTruncationScenarios")
+    public static Object[][] rangesTruncationScenarios() {
+        // maxRanges, totalEntries, shouldTruncate.
+        return new Object[][] {
+                { 5, 16, true },   // 8 ack holes, above limit 5 → truncation
+                { 10, 6, false },  // 3 ack holes, limit 10 → no truncation
+                { 5, 6, false },   // 3 ack holes, limit 5 → no truncation
+        };
+    }
+
+    @Test(timeOut = 20000, dataProvider = "rangesTruncationScenarios")
+    public void testPersistUnackedRangesTruncatedCounter(int maxRanges, int 
totalEntries, boolean shouldTruncate)
+            throws Exception {
+        @Cleanup
+        InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+        @Cleanup
+        var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
+                .disableShutdownHook()
+                .addPropertiesSupplier(() -> Map.of("otel.metrics.exporter", 
"none",
+                        "otel.traces.exporter", "none",
+                        "otel.logs.exporter", "none"))
+                .addMeterProviderCustomizer((builder, __) -> 
builder.registerMetricReader(metricReader))
+                .build()
+                .getOpenTelemetrySdk();
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactoryImpl otelFactory = new ManagedLedgerFactoryImpl(
+                metadataStore,
+                (policyConfig) -> CompletableFuture.completedFuture(bkc),
+                new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE, 
openTelemetry);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxUnackedRangesToPersist(maxRanges);
+        // Force persistence through the ledger path (not metadata store).
+        config.setMaxUnackedRangesToPersistInMetadataStore(0);
+
+        String ledgerName = 
"my-tenant/my-ns/persistent/test-persist-unacked-ranges-truncated-" + 
UUID.randomUUID();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
otelFactory.open(ledgerName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < totalEntries; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(Encoding)));
+        }
+        // Ack alternating positions to create ack holes in the cursor.
+        for (int i = 1; i < positions.size(); i += 2) {
+            cursor.delete(positions.get(i));
+        }
+
+        ledger.close();
+
+        long truncationCount = metricReader.collectAllMetrics().stream()
+                .filter(m -> 
OpenTelemetryManagedCursorStats.PERSIST_UNACKED_RANGES_TRUNCATED.equals(m.getName()))
+                .flatMap(m -> m.getLongSumData().getPoints().stream())
+                .mapToLong(point -> point.getValue())
+                .sum();
+
+        // Direction only: persist cadence during close() is not a stable 
contract.
+        if (shouldTruncate) {
+            assertTrue(truncationCount >= 1, "expected truncation, was " + 
truncationCount);
+
+            ManagedLedgerImpl reopened = (ManagedLedgerImpl) 
otelFactory.open(ledgerName, config);
+            ManagedCursorImpl recovered = (ManagedCursorImpl) 
reopened.openCursor("c1");
+            
assertEquals(recovered.getIndividuallyDeletedMessagesSet().asRanges().size(), 
maxRanges,
+                    "persisted range count must equal maxRanges");
+        } else {
+            assertEquals(truncationCount, 0L, "expected no truncation, was " + 
truncationCount);
+        }
+    }
+
+    @DataProvider(name = "batchIndexesTruncationScenarios")
+    public static Object[][] batchIndexesTruncationScenarios() {
+        // maxBatchIndexes, totalEntries, shouldTruncate.
+        return new Object[][] {
+                { 5, 16, true },   // 16 batch entries, above limit 5 → 
truncation
+                { 10, 6, false },  // 6 batch entries, limit 10 → no truncation
+                { 5, 5, false },   // 5 batch entries at exact limit → no 
truncation
+        };
+    }
+
+    @Test(timeOut = 20000, dataProvider = "batchIndexesTruncationScenarios")
+    public void testPersistBatchDeletedIndexesTruncatedCounter(int 
maxBatchIndexes, int totalEntries,
+            boolean shouldTruncate) throws Exception {
+        @Cleanup
+        InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+        @Cleanup
+        var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
+                .disableShutdownHook()
+                .addPropertiesSupplier(() -> Map.of("otel.metrics.exporter", 
"none",
+                        "otel.traces.exporter", "none",
+                        "otel.logs.exporter", "none"))
+                .addMeterProviderCustomizer((builder, __) -> 
builder.registerMetricReader(metricReader))
+                .build()
+                .getOpenTelemetrySdk();
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactoryImpl otelFactory = new ManagedLedgerFactoryImpl(
+                metadataStore,
+                (policyConfig) -> CompletableFuture.completedFuture(bkc),
+                new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE, 
openTelemetry);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxBatchDeletedIndexToPersist(maxBatchIndexes);
+        config.setDeletionAtBatchIndexLevelEnabled(true);
+        config.setMaxUnackedRangesToPersistInMetadataStore(0);
+
+        String ledgerName = 
"my-tenant/my-ns/persistent/test-persist-batch-deleted-indexes-truncated-"
+                + UUID.randomUUID();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
otelFactory.open(ledgerName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < totalEntries; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(Encoding)));
+        }
+        // Partial-ack each position so it lands in batchDeletedIndexes rather 
than individualDeletedMessages.
+        for (Position position : positions) {
+            BitSet ackSet = new BitSet(10);
+            ackSet.set(0, 10);
+            ackSet.clear(0);
+            cursor.delete(AckSetStateUtil.createPositionWithAckSet(
+                    position.getLedgerId(), position.getEntryId(), 
ackSet.toLongArray()));
+        }
+
+        ledger.close();
+
+        long truncationCount = metricReader.collectAllMetrics().stream()
+                .filter(m -> 
OpenTelemetryManagedCursorStats.PERSIST_BATCH_DELETED_INDEXES_TRUNCATED
+                        .equals(m.getName()))
+                .flatMap(m -> m.getLongSumData().getPoints().stream())
+                .mapToLong(point -> point.getValue())
+                .sum();
+
+        // Direction only: persist cadence during close() is not a stable 
contract.
+        if (shouldTruncate) {
+            assertTrue(truncationCount >= 1, "expected truncation, was " + 
truncationCount);
+        } else {
+            assertEquals(truncationCount, 0L, "expected no truncation, was " + 
truncationCount);
+        }
+    }
+
     @SuppressWarnings("try")
     class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
         Map<Long, Integer> ledgerErrors = new HashMap<>();

Reply via email to