This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d553cec5485 [improve][ml] Warn and emit metric when cursor ack state
exceeds persist limits (#25548)
d553cec5485 is described below
commit d553cec5485bdaa359b5bfdaea0162f10d3c13e5
Author: Alexandre Boyer <[email protected]>
AuthorDate: Tue Apr 21 14:10:24 2026 +0200
[improve][ml] Warn and emit metric when cursor ack state exceeds persist
limits (#25548)
---
conf/broker.conf | 4 +
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 58 ++++++++-
.../mledger/impl/ManagedLedgerFactoryImpl.java | 1 +
.../impl/OpenTelemetryManagedCursorStats.java | 35 +++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 144 +++++++++++++++++++++
5 files changed, 240 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 0447b3e05ac..4d6bb9162bf 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1424,6 +1424,8 @@ managedLedgerMaxReadsInFlightSizeInMB=0
# that were acknowledged. After the max number of ranges is reached, the
information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
+# Truncations emit a WARN log and increment
+# pulsar.broker.managed_ledger.cursor.persist.unacked_ranges.truncated.
managedLedgerMaxUnackedRangesToPersist=10000
# Maximum number of partially acknowledged batch messages per subscription
that will have their batch
@@ -1431,6 +1433,8 @@ managedLedgerMaxUnackedRangesToPersist=10000
# When this limit is exceeded, remaining batch message containing the batch
deleted indexes will
# only be tracked in memory. In case of broker restarts or load balancing
events, the batch
# deleted indexes will be cleared while redelivering the messages to consumers.
+# Truncations emit a WARN log and increment
+# pulsar.broker.managed_ledger.cursor.persist.batch_deleted_indexes.truncated.
managedLedgerMaxBatchDeletedIndexToPersist=10000
# When storing acknowledgement state, choose a more compact serialization
format that stores
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5eb81c9e691..c42ccf69f86 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -234,6 +234,10 @@ public class ManagedCursorImpl implements ManagedCursor {
// active state cache in ManagedCursor. It should be in sync with the
state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;
+ // Emit the truncation WARN logs exactly once per crossing.
+ private final AtomicBoolean lastCursorDataFullyPersistable = new
AtomicBoolean(true);
+ private final AtomicBoolean lastBatchDeletedIndexFullyPersistable = new
AtomicBoolean(true);
+
// This is a lock used to update the registration state of the cursor in
the managed ledger.
private final Object registerToWaitingCursorsLock = new Object();
// This is used to track if the cursor is registered in the managed
ledger's waitingCursors queue
@@ -3375,8 +3379,14 @@ public class ManagedCursorImpl implements ManagedCursor {
AtomicInteger acksSerializedSize = new AtomicInteger(0);
List<MessageRange> rangeList = new ArrayList<>();
+ final int maxRanges = getConfig().getMaxUnackedRangesToPersist();
+ final MutableBoolean truncated = new MutableBoolean(false);
individualDeletedMessages.forEachRawRange((lowerKey, lowerValue,
upperKey, upperValue) -> {
+ if (rangeList.size() >= maxRanges) {
+ truncated.setTrue();
+ return false;
+ }
MessageRange messageRange = new MessageRange();
messageRange.setLowerEndpoint()
.setLedgerId(lowerKey)
@@ -3388,11 +3398,33 @@ public class ManagedCursorImpl implements ManagedCursor
{
acksSerializedSize.addAndGet(messageRange.getSerializedSize());
rangeList.add(messageRange);
- return rangeList.size() <=
getConfig().getMaxUnackedRangesToPersist();
+ return true;
});
this.individualDeletedMessagesSerializedSize =
acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
+
+ if (truncated.booleanValue()) {
+ ledger.getFactory().getOpenTelemetryManagedCursorStats()
+ .incrementPersistUnackedRangesTruncated(this);
+ if (lastCursorDataFullyPersistable.compareAndSet(true, false))
{
+ int totalRanges = individualDeletedMessages.size();
+ log.warn()
+ .attr("totalRanges", totalRanges)
+ .attr("maxRanges", maxRanges)
+ .attr("truncated", totalRanges - rangeList.size())
+ .log("Individually deleted message ranges exceed"
+ + " managedLedgerMaxUnackedRangesToPersist."
+ + " Acknowledged messages beyond this limit are
not persisted"
+ + " and will be replayed on broker restart."
+ + " Consider raising
managedLedgerMaxUnackedRangesToPersist,"
+ + " verifying
managedLedgerPersistIndividualAckAsLongArray=true (the default),"
+ + " and setting
managedCursorInfoCompressionType=LZ4 to reduce the persisted size.");
+ }
+ } else {
+ lastCursorDataFullyPersistable.compareAndSet(false, true);
+ }
+
return rangeList;
} finally {
lock.writeLock().unlock();
@@ -3407,7 +3439,8 @@ public class ManagedCursorImpl implements ManagedCursor {
}
List<BatchedEntryDeletionIndexInfo> result = new ArrayList<>();
final var iterator = batchDeletedIndexes.entrySet().iterator();
- while (iterator.hasNext() && result.size() <
getConfig().getMaxBatchDeletedIndexToPersist()) {
+ int maxIndexes = getConfig().getMaxBatchDeletedIndexToPersist();
+ while (iterator.hasNext() && result.size() < maxIndexes) {
final var entry = iterator.next();
BatchedEntryDeletionIndexInfo batchDeletedIndexInfo = new
BatchedEntryDeletionIndexInfo();
batchDeletedIndexInfo.setPosition()
@@ -3419,6 +3452,27 @@ public class ManagedCursorImpl implements ManagedCursor {
}
result.add(batchDeletedIndexInfo);
}
+
+ if (iterator.hasNext()) {
+ ledger.getFactory().getOpenTelemetryManagedCursorStats()
+ .incrementPersistBatchDeletedIndexesTruncated(this);
+ if (lastBatchDeletedIndexFullyPersistable.compareAndSet(true,
false)) {
+ int totalIndexes = batchDeletedIndexes.size();
+ log.warn()
+ .attr("totalIndexes", totalIndexes)
+ .attr("maxIndexes", maxIndexes)
+ .attr("truncated", totalIndexes - result.size())
+ .log("Batch deleted indexes exceed"
+ + " managedLedgerMaxBatchDeletedIndexToPersist."
+ + " Partially acknowledged batch messages beyond
this limit are not persisted"
+ + " and will be replayed on broker restart."
+ + " Consider raising
managedLedgerMaxBatchDeletedIndexToPersist"
+ + " and setting
managedCursorInfoCompressionType=LZ4 to reduce the persisted size.");
+ }
+ } else {
+ lastBatchDeletedIndexFullyPersistable.compareAndSet(false,
true);
+ }
+
return result;
} finally {
lock.readLock().unlock();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b6c7d5b0629..60e972e561b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -140,6 +140,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
@Getter
private final OpenTelemetryManagedLedgerStats
openTelemetryManagedLedgerStats;
+ @Getter
private final OpenTelemetryManagedCursorStats
openTelemetryManagedCursorStats;
//indicate whether shutdown() is called.
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
index ec73c9d5e5e..cb544f3b190 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
import com.google.common.collect.Streams;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -55,6 +56,16 @@ public class OpenTelemetryManagedCursorStats implements
AutoCloseable {
public static final String INCOMING_BYTE_COUNTER =
"pulsar.broker.managed_ledger.cursor.incoming.size";
private final ObservableLongMeasurement incomingByteCounter;
+ // Broker-level counters incremented when cursor persistence silently
truncates ack state.
+ // See managedLedgerMaxUnackedRangesToPersist and
managedLedgerMaxBatchDeletedIndexToPersist.
+ public static final String PERSIST_UNACKED_RANGES_TRUNCATED =
+
"pulsar.broker.managed_ledger.cursor.persist.unacked_ranges.truncated";
+ private final LongCounter persistUnackedRangesTruncated;
+
+ public static final String PERSIST_BATCH_DELETED_INDEXES_TRUNCATED =
+
"pulsar.broker.managed_ledger.cursor.persist.batch_deleted_indexes.truncated";
+ private final LongCounter persistBatchDeletedIndexesTruncated;
+
private final BatchCallback batchCallback;
public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry,
ManagedLedgerFactoryImpl factory) {
@@ -96,6 +107,22 @@ public class OpenTelemetryManagedCursorStats implements
AutoCloseable {
.setDescription("The total amount of data read from the
ledger.")
.buildObserver();
+ persistUnackedRangesTruncated = meter
+ .counterBuilder(PERSIST_UNACKED_RANGES_TRUNCATED)
+ .setUnit("{truncation}")
+ .setDescription("The number of times a cursor exceeded"
+ + " managedLedgerMaxUnackedRangesToPersist, causing
ack state to be truncated"
+ + " at persistence. Ack state beyond the limit is lost
on broker restart.")
+ .build();
+
+ persistBatchDeletedIndexesTruncated = meter
+ .counterBuilder(PERSIST_BATCH_DELETED_INDEXES_TRUNCATED)
+ .setUnit("{truncation}")
+ .setDescription("The number of times a cursor exceeded"
+ + " managedLedgerMaxBatchDeletedIndexToPersist,
causing batch deleted index state"
+ + " to be truncated at persistence. State beyond the
limit is lost on broker restart.")
+ .build();
+
batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
.values()
.stream()
@@ -115,6 +142,14 @@ public class OpenTelemetryManagedCursorStats implements
AutoCloseable {
batchCallback.close();
}
+ public void incrementPersistUnackedRangesTruncated(ManagedCursor cursor) {
+ persistUnackedRangesTruncated.add(1,
cursor.getManagedCursorAttributes().getAttributes());
+ }
+
+ public void incrementPersistBatchDeletedIndexesTruncated(ManagedCursor
cursor) {
+ persistBatchDeletedIndexesTruncated.add(1,
cursor.getManagedCursorAttributes().getAttributes());
+ }
+
private void recordMetrics(ManagedCursor cursor) {
var stats = cursor.getStats();
var cursorAttributesSet = cursor.getManagedCursorAttributes();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1e9fbdb5053..a3eb4756335 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -119,6 +121,7 @@ import
org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.PositionInfo;
import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.collections4.iterators.EmptyIterator;
import org.apache.commons.lang3.mutable.MutableBoolean;
@@ -6032,6 +6035,147 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(properties.get(propertyKey), lastIndex - 1);
}
+ @DataProvider(name = "rangesTruncationScenarios")
+ public static Object[][] rangesTruncationScenarios() {
+ // maxRanges, totalEntries, shouldTruncate.
+ return new Object[][] {
+ { 5, 16, true }, // 8 ack holes, above limit 5 → truncation
+ { 10, 6, false }, // 3 ack holes, limit 10 → no truncation
+ { 5, 6, false }, // 3 ack holes, limit 5 → no truncation
+ };
+ }
+
+ @Test(timeOut = 20000, dataProvider = "rangesTruncationScenarios")
+ public void testPersistUnackedRangesTruncatedCounter(int maxRanges, int
totalEntries, boolean shouldTruncate)
+ throws Exception {
+ @Cleanup
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ @Cleanup
+ var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
+ .disableShutdownHook()
+ .addPropertiesSupplier(() -> Map.of("otel.metrics.exporter",
"none",
+ "otel.traces.exporter", "none",
+ "otel.logs.exporter", "none"))
+ .addMeterProviderCustomizer((builder, __) ->
builder.registerMetricReader(metricReader))
+ .build()
+ .getOpenTelemetrySdk();
+
+ @Cleanup("shutdown")
+ ManagedLedgerFactoryImpl otelFactory = new ManagedLedgerFactoryImpl(
+ metadataStore,
+ (policyConfig) -> CompletableFuture.completedFuture(bkc),
+ new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE,
openTelemetry);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxUnackedRangesToPersist(maxRanges);
+ // Force persistence through the ledger path (not metadata store).
+ config.setMaxUnackedRangesToPersistInMetadataStore(0);
+
+ String ledgerName =
"my-tenant/my-ns/persistent/test-persist-unacked-ranges-truncated-" +
UUID.randomUUID();
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
otelFactory.open(ledgerName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < totalEntries; i++) {
+ positions.add(ledger.addEntry(("entry-" + i).getBytes(Encoding)));
+ }
+ // Ack alternating positions to create ack holes in the cursor.
+ for (int i = 1; i < positions.size(); i += 2) {
+ cursor.delete(positions.get(i));
+ }
+
+ ledger.close();
+
+ long truncationCount = metricReader.collectAllMetrics().stream()
+ .filter(m ->
OpenTelemetryManagedCursorStats.PERSIST_UNACKED_RANGES_TRUNCATED.equals(m.getName()))
+ .flatMap(m -> m.getLongSumData().getPoints().stream())
+ .mapToLong(point -> point.getValue())
+ .sum();
+
+ // Direction only: persist cadence during close() is not a stable
contract.
+ if (shouldTruncate) {
+ assertTrue(truncationCount >= 1, "expected truncation, was " +
truncationCount);
+
+ ManagedLedgerImpl reopened = (ManagedLedgerImpl)
otelFactory.open(ledgerName, config);
+ ManagedCursorImpl recovered = (ManagedCursorImpl)
reopened.openCursor("c1");
+
assertEquals(recovered.getIndividuallyDeletedMessagesSet().asRanges().size(),
maxRanges,
+ "persisted range count must equal maxRanges");
+ } else {
+ assertEquals(truncationCount, 0L, "expected no truncation, was " +
truncationCount);
+ }
+ }
+
+ @DataProvider(name = "batchIndexesTruncationScenarios")
+ public static Object[][] batchIndexesTruncationScenarios() {
+ // maxBatchIndexes, totalEntries, shouldTruncate.
+ return new Object[][] {
+ { 5, 16, true }, // 16 batch entries, above limit 5 →
truncation
+ { 10, 6, false }, // 6 batch entries, limit 10 → no truncation
+ { 5, 5, false }, // 5 batch entries at exact limit → no
truncation
+ };
+ }
+
+ @Test(timeOut = 20000, dataProvider = "batchIndexesTruncationScenarios")
+ public void testPersistBatchDeletedIndexesTruncatedCounter(int
maxBatchIndexes, int totalEntries,
+ boolean shouldTruncate) throws Exception {
+ @Cleanup
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ @Cleanup
+ var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
+ .disableShutdownHook()
+ .addPropertiesSupplier(() -> Map.of("otel.metrics.exporter",
"none",
+ "otel.traces.exporter", "none",
+ "otel.logs.exporter", "none"))
+ .addMeterProviderCustomizer((builder, __) ->
builder.registerMetricReader(metricReader))
+ .build()
+ .getOpenTelemetrySdk();
+
+ @Cleanup("shutdown")
+ ManagedLedgerFactoryImpl otelFactory = new ManagedLedgerFactoryImpl(
+ metadataStore,
+ (policyConfig) -> CompletableFuture.completedFuture(bkc),
+ new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE,
openTelemetry);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxBatchDeletedIndexToPersist(maxBatchIndexes);
+ config.setDeletionAtBatchIndexLevelEnabled(true);
+ config.setMaxUnackedRangesToPersistInMetadataStore(0);
+
+ String ledgerName =
"my-tenant/my-ns/persistent/test-persist-batch-deleted-indexes-truncated-"
+ + UUID.randomUUID();
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
otelFactory.open(ledgerName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < totalEntries; i++) {
+ positions.add(ledger.addEntry(("entry-" + i).getBytes(Encoding)));
+ }
+ // Partial-ack each position so it lands in batchDeletedIndexes rather
than individualDeletedMessages.
+ for (Position position : positions) {
+ BitSet ackSet = new BitSet(10);
+ ackSet.set(0, 10);
+ ackSet.clear(0);
+ cursor.delete(AckSetStateUtil.createPositionWithAckSet(
+ position.getLedgerId(), position.getEntryId(),
ackSet.toLongArray()));
+ }
+
+ ledger.close();
+
+ long truncationCount = metricReader.collectAllMetrics().stream()
+ .filter(m ->
OpenTelemetryManagedCursorStats.PERSIST_BATCH_DELETED_INDEXES_TRUNCATED
+ .equals(m.getName()))
+ .flatMap(m -> m.getLongSumData().getPoints().stream())
+ .mapToLong(point -> point.getValue())
+ .sum();
+
+ // Direction only: persist cadence during close() is not a stable
contract.
+ if (shouldTruncate) {
+ assertTrue(truncationCount >= 1, "expected truncation, was " +
truncationCount);
+ } else {
+ assertEquals(truncationCount, 0L, "expected no truncation, was " +
truncationCount);
+ }
+ }
+
@SuppressWarnings("try")
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();