lhotari opened a new issue, #25985:
URL: https://github.com/apache/pulsar/issues/25985

   ### Search before reporting
   
   - [x] I searched the [issues](https://github.com/apache/pulsar/issues) and 
found nothing similar.
   
   ### Version
   
   `managedLedgerPersistIndividualAckAsLongArray=true` is the **default since 
Pulsar 4.1.x**, and is also available
   in 3.0.x / 3.3.x / 4.0.x (it was added in patch releases). The long-array 
format was introduced by
   [PR #9292](https://github.com/apache/pulsar/pull/9292).
   
   ## Summary
   
   When a subscription accumulates out-of-order acknowledgments (Shared / 
Key_Shared subscriptions, negative
   acknowledgments, delayed delivery, retry/DLQ), the cursor's 
individually-deleted message ranges are persisted
   with the long-array format 
(`managedLedgerPersistIndividualAckAsLongArray=true`). **That format stores a 
dense
   bit array whose size is proportional to the backlog (the entry-id span the 
cursor covers), not to the number
   of acknowledgment holes.** Because the BookKeeper cursor-ledger path is 
**not compressed**, the persisted state
   reaches the BookKeeper single-entry frame limit (`nettyMaxFrameSizeBytes`, 
~5 MB by default) at a backlog of
   roughly **30 M entries** — independent of how few ack holes actually exist.
   
   Once the state no longer fits, the cursor can no longer persist the 
individual deleted message state, and that
   state is effectively lost on a broker restart or load shedding (topic 
unload). Any backlog larger than ~30 M
   entries — the point where the dense state reaches the ~5 MB limit — is 
affected. As an example of a bad
   scenario, consider a **50 M-entry backlog** in which every message **except 
the oldest** has been acknowledged:
   the oldest message keeps the mark-delete position pinned while ~50 M 
individual acknowledgments accumulate, the
   state cannot be persisted, and after a restart or unload **all ~50 M 
messages are redelivered to consumers**.
   
   ## What happens
   
   `individualDeletedMessages` is held in memory as a RoaringBitmap-backed 
range set (`OpenLongPairRangeSet` with
   `RoaringBitSet`). When `managedLedgerPersistIndividualAckAsLongArray=true`, 
it is serialized for persistence via
   `RoaringBitSet.toLongArray()` into `individualDeletedMessageRanges` (a 
`repeated LongListMap { int64 key;
   repeated int64 values }`, one `LongListMap` per ledger; 
`ManagedCursorImpl#buildLongPropertiesMap`).
   
   `RoaringBitSet.toLongArray()` follows the `java.util.BitSet.toLongArray()` 
contract: it returns a **dense**
   `long[]` of length `highestSetBit/64 + 1` — i.e. **one 64-bit word for every 
64 entry-ids up to the highest
   acknowledged entry-id in that ledger, including all-zero interior/leading 
words**. RoaringBitmap's own compact
   serialization (`RoaringBitmap.serialize`, which stores array/bitmap/run 
*containers*) is bypassed.
   
   As a result the serialized size is governed by the **per-ledger entry-id 
span** (the backlog), not by the
   number of ack holes. The same applies to the per-batch `deleteSet` in 
`BatchedEntryDeletionIndexInfo`
   (`acknowledgmentAtBatchIndexLevelEnabled=true`, also a default), which is 
serialized into the same entry.
   
   The cursor ledger is stored in BookKeeper without compression (compression 
is only applied to the
   metadata-store `ManagedCursorInfo`), so the dense bytes are unmitigated 
there. A ~30 M-entry backlog already
   serializes to ~5 MB, exceeding `nettyMaxFrameSizeBytes`. Every mark-delete 
flush (by default once per second)
   then first attempts to store the state to the cursor ledger in BookKeeper, 
fails the frame-size limit, and
   retries to the metadata store (where it may also exceed `jute.maxbuffer`). 
The managed ledger can additionally
   enter a bad state on cursor-ledger rollover once the state can no longer be 
stored.
   
   ## Impact
   
   1. **Lost acknowledgment state → mass redelivery.** Once the individual 
deleted message state cannot be
      persisted, it is effectively lost on a broker restart or load shedding 
(topic unload): the mark-delete
      position is recovered but the individual acks after it are gone, so all 
those messages are redelivered. For
      example, with a 50 M-entry backlog acknowledged except for the oldest 
message (any backlog beyond the ~30 M
      limit fails to persist), **all ~50 M messages are redelivered to 
consumers**.
   2. **Hard backlog ceiling (~30 M entries).** The dense encoding makes the 
ack-state for a single ~5 MB
      BookKeeper entry cap out at ~30 M entries, regardless of ack-hole count; 
past that the BK add exceeds
      `nettyMaxFrameSizeBytes` and fails.
   3. **The persistence count cap does not protect against it.** 
`managedLedgerMaxUnackedRangesToPersist` caps the
      number of ranges (set bits / cardinality), not bytes; the byte cost is 
driven by the entry-id span, so the
      cap cannot bound the serialized size.
   4. **Per-flush latency and overhead.** Every mark-delete flush (default: 
once per second) first attempts to
      persist to the cursor ledger (BookKeeper), fails the frame-size limit, 
and then retries to the metadata
      store — adding latency and overhead on every flush — and the managed 
ledger can enter a bad state on
      cursor-ledger rollover.
   5. **Heap / GC pressure on every flush.** Serialization materializes the 
entire dense `long[]` (and
      deserialization rebuilds it) on every mark-delete flush, regardless of 
how few holes exist.
   
   ## Measurements
   
   Measured with a size-characterization test built on the real 
`RoaringBitSet`-backed range set and the generated
   `MLDataFormats` messages. Ack holes are spread across the backlog with ~50% 
variance in the gap; the dense size
   is essentially independent of hole density:
   
   | backlog (entries) | dense `long[]` (BK, uncompressed) | dense transient 
heap / flush |
   |---:|---:|---:|
   | 1 M   | ~0.16 MB | ~0.12 MB |
   | 10 M  | ~1.63 MB | ~1.19 MB |
   | 100 M | ~16.3 MB | ~11.9 MB |
   
   Projected backlog ceiling for a single ~5 MB BK entry, current dense 
encoding (≈ independent of ack-hole
   density): **~30.5 M – ~30.8 M entries**.
   
   For comparison, the same state serialized with `RoaringBitmap.serialize` 
(run-optimized) is far smaller and
   independent of the backlog span — e.g. ~0.43 MB vs ~16.4 MB at a 100 M 
backlog with 0.1% unacked.
   
   ## Workarounds
   
   There is no complete workaround; the following only adjust the behavior 
slightly and do not change the fact
   that the storage size scales with the backlog:
   
   - Increase `managedLedgerMaxUnackedRangesToPersistInMetadataStore` to reduce 
noisy logs.
   - Adjust `managedLedgerDefaultMarkDeleteRateLimit` (default: once per 
second) to control how frequently the
     state is persisted.
   
   ## Possible directions
   
   Encode the acknowledgment state so its size tracks the number of ack holes 
rather than the backlog (for
   example, serialize the RoaringBitmap directly instead of a dense `long[]`), 
and/or compress the cursor-ledger
   entries the same way `ManagedCursorInfo` is compressed in the metadata store.
   
   [PIP-81](https://github.com/apache/pulsar/blob/master/pip/pip-81.md) (split 
the individual acknowledgments into
   multiple entries) and 
[PIP-381](https://github.com/apache/pulsar/blob/master/pip/pip-381.md) (handle 
large
   `PositionInfo` state) are prior attempts to address the large ack-state 
problem in different ways. In
   particular, **PIP-81 contains a very useful idea: splitting the state 
handling across multiple ledger
   entries**, so that the full state does not have to be rewritten when only 
one part of it changes. Today the
   **complete** state is persisted on every mark-delete flush (by default once 
per second); with a split,
   multi-entry representation only the changed portion would need to be 
appended. Combined with RoaringBitmap
   serialization and a compression algorithm (LZ4, or an integer-compression 
algorithm), the entries would be a
   lot smaller, and the per-entry limits could be derived from worst-case 
storage-size bounds so that each entry
   holds the unacknowledged state for a bounded range of the backlog.
   
   ## References
   
   - Long-array format introduced by: [PR 
#9292](https://github.com/apache/pulsar/pull/9292)
   - Cursor-info compression (metadata store only): PIP-146
   - Prior attempts at the large ack-state problem: PIP-81, PIP-381
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to