void-ptr974 opened a new pull request, #26024:
URL: https://github.com/apache/pulsar/pull/26024
### Motivation
`PendingAcksMap` is used on the individual-ack dispatch path for Shared and
Key_Shared consumers. It currently stores pending entries as:
`TreeMap<Long, TreeMap<Long, IntIntPair>>`
The outer ledger ordering is still useful for range removal, but the inner
entry map does not require ordering. Keeping a `TreeMap<Long, IntIntPair>` per
ledger adds per-entry object overhead from boxed entry IDs, tree nodes, and
`IntIntPair` values.
This change keeps the outer ledger ordering and replaces the inner entry
storage with primitive `long -> long` storage. The pending ack value stores
`(remainingUnacked, stickyKeyHash)` in a packed `long`, reducing allocation,
object count, and GC pressure on common pending-ack operations.
### Modifications
- Replace the inner pending ack map with `Long2LongOpenHashMap`.
- Store `(remainingUnacked, stickyKeyHash)` as a packed `long`.
- Keep `PendingAcksMap` public behavior unchanged by unpacking values at API
and callback boundaries.
- Add `Long2LongMap` / `Long2LongOpenHashMap` with focused unit tests.
- Update `PendingAcksMapTest` to cover packed value round trips, update,
removal, and range-removal behavior.
- Add `PendingAcksMapBenchmark` with old-vs-new implementations for common
operations.
### Benchmark methodology
The benchmark is implemented in `PendingAcksMapBenchmark` and keeps both
implementations in the same benchmark class:
- `oldProduction`: the previous `TreeMap<Long, TreeMap<Long, IntIntPair>>`
layout
- `production`: the new `TreeMap<Long, Long2LongOpenHashMap>` layout
Both implementations use the same generated ledger IDs, entry IDs,
`remainingUnacked`, and `stickyKeyHash` values for each dataset. The benchmark
runs with JMH average-time mode and `-prof gc` to report normalized allocation.
Build:
```bash
./gradlew :microbench:shadowJar
```
Run common operations:
```bash
java -jar microbench/build/libs/microbench-5.0.0-M1-SNAPSHOT-benchmarks.jar \
'PendingAcksMapBenchmark\.(addOrReplace|containsHit|getRemainingUnackedHit|updateRemainingUnacked|removeAndAddRemaining)'
\
-p implementation=oldProduction,production \
-p dataset=64kEntries1kLedgers,1mEntries16kLedgers \
-wi 2 -i 2 -w 1s -r 1s -f 1 \
-prof gc \
-rf csv -rff /tmp/pending_acks_common.csv
```
Run full-iteration and range operations separately:
```bash
java -jar microbench/build/libs/microbench-5.0.0-M1-SNAPSHOT-benchmarks.jar \
'PendingAcksMapBenchmark\.(forEachAll|removeAllUpTo|populate)' \
-p implementation=oldProduction,production \
-p dataset=64kEntries1kLedgers,1mEntries16kLedgers \
-wi 1 -i 1 -w 1s -r 1s -f 1 \
-prof gc \
-rf csv -rff /tmp/pending_acks_range.csv
```
The reported numbers are sample results from the same local machine and JVM.
They should be read as directional JMH data rather than absolute production
latency.
### Benchmark results
Sample JMH results from `PendingAcksMapBenchmark`:
| Benchmark | Dataset | Old | New | Change |
|---|---:|---:|---:|---:|
| `addOrReplace` | 64k entries / 1k ledgers | 131.017 ns/op | 67.181 ns/op |
-48.7% |
| `containsHit` | 64k / 1k | 83.872 ns/op | 63.139 ns/op | -24.7% |
| `getRemainingUnackedHit` | 64k / 1k | 85.828 ns/op | 71.299 ns/op | -16.9%
|
| `updateRemainingUnacked` | 64k / 1k | 129.768 ns/op | 63.006 ns/op |
-51.4% |
| `removeAndAddRemaining` | 64k / 1k | 214.540 ns/op | 151.922 ns/op |
-29.2% |
| `forEachAll` | 64k / 1k | 711.515 us/op | 125.063 us/op | -82.4% |
| `removeAllUpTo` | 64k / 1k | 395.256 us/op | 186.338 us/op | -52.9% |
| `populate` | 64k / 1k | 7.282 ms/op | 5.534 ms/op | -24.0% |
| `addOrReplace` | 1m entries / 16k ledgers | 466.693 ns/op | 257.576 ns/op
| -44.8% |
| `getRemainingUnackedHit` | 1m / 16k | 252.271 ns/op | 199.319 ns/op |
-21.0% |
| `updateRemainingUnacked` | 1m / 16k | 484.468 ns/op | 190.575 ns/op |
-60.7% |
| `removeAndAddRemaining` | 1m / 16k | 801.179 ns/op | 352.592 ns/op |
-56.0% |
| `forEachAll` | 1m / 16k | 11.335 ms/op | 5.362 ms/op | -52.7% |
| `removeAllUpTo` | 1m / 16k | 6.924 ms/op | 4.144 ms/op | -40.2% |
| `populate` | 1m / 16k | 283.612 ms/op | 197.176 ms/op | -30.5% |
Allocation examples:
| Benchmark | Dataset | Old | New |
|---|---:|---:|---:|
| `addOrReplace` | 64k / 1k | 45.001 B/op | 21.000 B/op |
| `addOrReplace` | 1m / 16k | 47.815 B/op | 23.814 B/op |
| `updateRemainingUnacked` | 64k / 1k | 24.001 B/op | ~0 B/op |
| `updateRemainingUnacked` | 1m / 16k | 24.003 B/op | ~0 B/op |
| `removeAndAddRemaining` | 64k / 1k | 85.001 B/op | 21.001 B/op |
| `removeAndAddRemaining` | 1m / 16k | 87.817 B/op | 23.815 B/op |
### Object size
Measured with JOL `GraphLayout.totalSize()`.
The scenarios below are based on the default consumer-side limits:
- `receiverQueueSize = 1000`
- `maxUnackedMessagesPerConsumer = 50000`
- `managedLedgerMaxEntriesPerLedger = 50000`
`PendingAcksMap` stores pending entries, not messages. With batching, the
entry count is lower than the unacked message count.
| Scenario | Entries / ledgers | Old | New | Change |
|---|---:|---:|---:|---:|
| Healthy consumer window | 1,000 / 1 | 86.3 KiB | 34.4 KiB | -60.2% |
| Healthy window, avg batch size 10 | 100 / 1 | 8.9 KiB | 4.6 KiB | -48.3% |
| Slow consumer, about half of default unacked limit | 25,000 / 1 | 2.10 MiB
| 1.06 MiB | -49.3% |
| Slow consumer at default unacked limit | 50,000 / 1 | 4.20 MiB | 2.13 MiB
| -49.4% |
| Slow consumer at default limit across a ledger boundary | 50,000 / 2 |
4.20 MiB | 2.13 MiB | -49.4% |
| Slow consumer at default limit across a few ledgers | 50,000 / 5 | 4.20
MiB | 1.33 MiB | -68.3% |
In normal high-throughput topics, the pending ack window is usually
concentrated in the active ledger or a small number of recent ledgers. A sparse
distribution with long-lived residual pending entries across many ledgers can
be optimized separately if needed.
### Verification
- `./gradlew :pulsar-broker:test --tests
org.apache.pulsar.broker.service.PendingAcksMapTest`
- `./gradlew :pulsar-common:test --tests
org.apache.pulsar.common.util.collections.Long2LongOpenHashMapTest`
- `./gradlew :pulsar-broker:checkstyleMain :pulsar-common:checkstyleMain`
- `./gradlew :pulsar-broker:checkstyleTest`
- `./gradlew :pulsar-common:checkstyleTest`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]