hongkunxu opened a new issue, #16584: URL: https://github.com/apache/pinot/issues/16584
# RFC: Broker and Server Segment Query Cache for Apache Pinot Status: Draft 0.1 Authors: Xiang Fu, Hongkun Xu Created: 2025-08-13 Target Release: TBD Discussion Thread: (link after posting to [email protected]) ## 0. Executive Summary This RFC proposes a two-layer query caching feature for Apache Pinot: 1. **Broker Result Cache** – caches fully merged query results at the broker layer. 2. **Server Segment Result Cache** – caches per-segment partial results (aggregations, group-by tables, and selection blocks) at the server layer. The design emphasizes correctness on immutable/sealed segments, pluggability of cache backends, precise invalidation, and rich observability. It draws on patterns from Apache Druid (broker + historical caches), StarRocks (backend caches), and ClickHouse (final result cache + block caches) while fitting Pinot’s execution model. ## 1. Background & Motivation Pinot executes every query end-to-end even when the inputs and the plan are unchanged. In dashboard scenarios, identical queries recur at 5–60s intervals and create avoidable CPU and IO load. Immutable segments (offline and sealed realtime) are natural candidates for result reuse. Goals of this RFC: - Reduce p95/p99 latency and cluster cost for repetitive workloads. - Provide predictable correctness via strong versioning and conservative defaults. - Offer a pluggable SPI and configuration knobs to match diverse deployments. > Non-goals for Phase 1: ORDER BY/top-k caching, semantic/approximate matching, mutable/upsert caching. ## 2. Terminology - Segment: Pinot data shard (offline or realtime). Realtime may be consuming or sealed. - CRC/Version: Segment-level versioning/epoch used to detect content changes. - Broker Response: Final BrokerResponseNative after merge/trim. - Partial Result: Per-segment contribution (agg array, group-by map, or selection rows) before broker merge. ## 3. Requirements 3.1 Functional - Exact-match caching of final results (broker) and partial results (server) for supported operators. - Deterministic, canonical keys covering query + data versions + relevant options. - Automatic invalidation on segment/schema/table-config changes and server/broker lifecycle events. - Manual invalidation APIs for operators and SREs. - Per-query escape hatches (disable flags) for debugging. ### 3.2 Correctness & Safety - Enabled by default only for: offline and sealed realtime segments. - Disabled by default for: consuming realtime segments and upsert tables. - Staleness bounded by TTL but correctness enforced by versioned keys. ### 3.3 Performance - Weighted LRU with size in bytes, not entry count. - Optional compression for value payloads. - Single-flight de-duplication to mitigate thundering herds. ### 3.4 Operability - Detailed metrics (hits/misses/bytes/latency). - Tracing spans and cache decision annotations. - Configurable per-table overrides. ## 4. High-Level Architecture Two orthogonal caches: ### 4.1 Broker Result Cache - Placement: Broker (query entry/exit). - Key: Canonical SQL + normalized options + routing table version + participating segment epochs + schema epoch. - Value: Serialized BrokerResponseNative (optionally compressed). - Lookup Path: Check before dispatch → on hit, return; on miss, execute and store. - Invalidation: Broker listens to ExternalView/segment metadata and schema/config change events to purge affected keys. ### 4.2 Server Segment Result Cache - Placement: Server, per-segment, around the operator execution. - Key Composition: Key = HASH( tableNameWithType, segmentName, segmentCrcOrEpoch, planSignature, // canonical operator tree projectionSchemaSig, // columns+types used queryOptionsSig, // null handling, response format, group trim thresholds, etc. starTreeSig, // star-tree id/version used by planner timeRangeConstraintSig, // broker pruning constraints intersected with segment limitSig // affects partial value shapes ) Value Forms: Aggregation (no group-by): double/long/decimal arrays per function. GroupBy: compact groupKey → agg[] map (already trimmed to server threshold). Selection (no ORDER BY): encoded row block up to per-segment limit. Invalidation: On segment add/remove/reload; on schema/config change; manual admin calls. For consuming segments (if enabled), key additionally includes (partitionId, endOffset). ## 5. Detailed Design ### 5.1 Canonicalization & Signatures **Filter AST Canonicalization:** - Normalize commutative operators (AND/OR) by sorting children. - Normalize predicate forms (e.g., a IN (3,1,2) → IN (1,2,3)). - Normalize literals to internal storage types (timezone, number scale). - Push-downs folded; redundant predicates eliminated where safe. **Projection/Transform Signature:** - Stable ordering by expression string; include resolved types and dictionary/transform usage. **Aggregation Signature:** - Function name + normalized args + params (e.g., approx_percentile(0.95) vs quantile(0.95)). - Group-by keys listed in stable order. **Routing/Time Constraint Signature (broker):** - Include selected segments and their epochs; include min/max time ranges used for pruning. ### 5.2 SPI Interfaces (Java Sketch) ``` public interface QueryResultCache<K, V> { @Nullable V get(K key); void put(K key, V value, long weightBytes); void invalidate(Predicate<K> predicate); void clear(); } public interface SegmentResultCache extends QueryResultCache<SegmentCacheKey, SegmentCacheValue> {} @Value class SegmentCacheKey { /* fields matching Key above; equals/hashCode */ } sealed interface SegmentCacheValue permits AggPart, GroupByPart, SelectionPart { int serializedSizeBytes(); } final class AggPart implements SegmentCacheValue { double[] doubles; long[] longs; /* ... */ } final class GroupByPart implements SegmentCacheValue { /* groupKey dict + value arrays + trim meta */ } final class SelectionPart implements SegmentCacheValue { /* encoded rows */ } ``` ### 5.3 Value Encoding & Compression - Versioned headers: magic(2B) | version(1B) | type(1B) | flags(1B) | size(4B) | payload. - Payload encodings: - Agg arrays: little-endian primitives; optional RLE for zero-dense arrays. - GroupBy: two-level structure (key dictionary + columnar agg arrays); varint lengths. - Selection: reuse DataTable encoding or introduce a leaner row block for cache (TBD). - Compression: configurable none|LZ4|ZSTD; default LZ4. ### 5.4 Memory Accounting & Eviction - Weight = serializedSizeBytes (post-compress). - Caffeine Weigher returns exact byte size to bound memory. - Separate pools per value type optional (phase 2). ### 5.5 Concurrency Control - Per-key single-flight registry: only one thread populates a missing key; others await a CompletableFuture. - Optional stale-while-revalidate (SWR): serve entries within swr.ms while refresh is in-flight (disabled by default). ### 5.6 Integration Points Broker: - Entry: BrokerRequestHandler.handleRequest() → pre-check. - Exit: after reduceAndSetExecutionStats() → store on success. - Skip store for partial/errored responses or when queryOptions.resultCache=false. Server: - Around segment operator execution (e.g., PlanNodeRunner or OpChain root). - Before building operators, compute planSignature. - On hit: short-circuit and return cached SegmentCacheValue. - On miss: execute operators → convert to value → put(). 5.7 Invalidation Sources & Propagation - Segment lifecycle: addSegment/removeSegment/reloadSegment from InstanceDataManager hooks call invalidate(k -> k.segmentName.equals(...)). - Schema/Table config: Listeners produce an epoch bump kept in memory; keys include schema/config epoch, forcing misses for old entries. Optionally bulk-invalidate by table. - Broker routing change: Broker cache key embeds segment epochs; any change yields a different key (implicit invalidation). - Admin APIs: Server and broker expose endpoints (see §7) to clear by table/segment or all. 5.8 Failure Modes - Cache service down (remote backend): Fail-closed to execution; count as miss; circuit breaker to avoid hot looping. - Deserialization error (version mismatch): Drop entry; increment cache_value_deser_errors; execute normally. - Memory pressure: Caffeine evicts by size; emit warnings when hit rate < threshold with near-OOM events. 5.9 Security & Multi-Tenancy - Namespace keys by tableNameWithType. - Per-table configuration overrides, including disablement. - Admin APIs gated behind existing auth/role checks. ## 6. Configuration ### 6.1 Broker ``` pinot.broker.query.cache.enabled = false pinot.broker.query.cache.backend = in-memory | redis pinot.broker.query.cache.max.bytes = 512MB pinot.broker.query.cache.ttl.ms = 300000 pinot.broker.query.cache.compress = lz4 pinot.broker.query.cache.singleflight.enabled = true Per-table override: tableConfig.ingestionConfig.queryCache.broker.* ``` ### 6.2 Server (Segment) ``` pinot.server.segment.query.cache.enabled = false pinot.server.segment.query.cache.backend = in-memory | redis | rocksdb pinot.server.segment.query.cache.max.bytes = 256MB pinot.server.segment.query.cache.ttl.ms = 120000 pinot.server.segment.query.cache.compress = lz4 pinot.server.segment.query.cache.enable.offline = true pinot.server.segment.query.cache.enable.realtime.sealed = true pinot.server.segment.query.cache.enable.realtime.consuming = false pinot.server.segment.query.cache.disable.for.upsert = true pinot.server.segment.query.cache.singleflight.enabled = true pinot.server.segment.query.cache.stale_while_revalidate.ms = 0 ``` ## 7. Admin & Debug APIs Broker: POST /queryCache/invalidate?table=<tbl> → returns {removedKeys, bytes} POST /queryCache/clearAll → clears all GET /queryCache/stats → hit/miss/bytes per table Server: POST /segmentCache/invalidate?table=<tbl> POST /segmentCache/invalidate?table=<tbl>&segment=<seg> POST /segmentCache/clearAll GET /segmentCache/stats Per-query options: SET option 'resultCache' = false (broker) SET option 'segmentCache' = false (server) ## 8. Observability - Metrics (Broker/Server): - cache.hit, cache.miss, cache.put, cache.evict, cache.invalidate - cache.size.bytes, cache.value.bytes - cache.load.latency.ms, cache.hit.savings.ms (derived) - cache.singleflight.waiters - Tracing: Add spans cache.lookup, cache.miss_compute, cache.put, with tags: key.type, table, segment, reason (hit/miss/cannot-cache). - Logs: Structured log when bypassing cache due to policy/size/option. ## 9. Backends ### 9.1 In-memory (Caffeine) - Weighted LRU, TTL after write, optional SWR. - Zero external dependencies; default choice. ### 9.2 Redis (Optional) - Pros: cross-broker sharing for result cache; warm restarts. - Cons: network latency; need timeouts and circuit breaker. ### 9.3 RocksDB (Optional, server only) - Pros: large local capacity; survives restart. - Cons: file IO; compaction overhead. ## 10. Testing Strategy **Unit:** - Key canonicalization invariants (commutativity, literal normalization). - TTL/eviction and weight accounting. - Invalidation on segment reload and schema change. **Integration:** - Mini-cluster with offline + sealed realtime segments. - Workloads: (1) agg only, (2) agg+group-by, (3) selection no ORDER BY. - Verify correctness vs cache disabled; measure latency improvements. Chaos/Failure: - Kill/restart brokers/servers → ensure no corruption and warm/cold behavior acceptable. - Redis/RocksDB outages → graceful degrade to misses. ## 11. Rollout Plan 1. Phase 1 (MVP): broker final result cache; server cache for agg, group-by, selection (no ORDER BY) on offline + sealed realtime. Upsert & consuming disabled. 2. Phase 2: DocIdSet (filter) micro-cache; ORDER BY top-k per-segment cache; adaptive TTL; per-type pools. 3. Phase 3: Remote backends (Redis/RocksDB) hardened; SWR; auto warm-up hooks. ## 12. Performance & Sizing Guidance - Start with broker.max.bytes = 512MB, server.max.bytes = 256MB per node. - Expect 20–70% hit rates on dashboard workloads with 15–60s refresh. - Benefits scale with segment immutability and repeated plan shapes. - Monitor cache.hit.savings.ms to convert wins into CPU-hours saved. ## 13. Alternatives Considered - Only broker cache: simpler but misses per-segment compute savings and limits reuse across similar (not identical) queries. - Only server cache: helps compute but not network/merge cost; less impactful for identical dashboards. - No cache: relies purely on block/index caches; inadequate for repeated analytics. ## 14. Risks & Mitigations - Stale results: Strong versioning in keys; conservative defaults; short TTLs. - Memory blowups: Byte-accurate weigher; per-table limits; robust metrics. - Complexity: Clear disable flags; thorough observability; staged rollout. ## 15. Implementation Tasks (GitHub Issue Breakdown) **Epic: Broker Result Cache** 1. Broker cache config & SPI skeleton 2. Caffeine backend + metrics 3. Canonical key generator (SQL → AST → canonical string) 4. Broker integration + store path 5. Invalidation hooks (segment/schema/config) 6. Admin REST + RBAC 7. Unit & integration tests; docs **Epic: Server Segment Cache** 1. SegmentResultCache SPI + configs 2. Caffeine backend (weighted) + compression 3. Plan signature builder (filter/projection/agg) 4. Execution hook integration (pre/post operators) 5. Lifecycle invalidation wiring (InstanceDataManager) 6. Realtime consuming & upsert safety gates 7. Metrics & tracing; debug flags 8. Admin REST + RBAC 9. Unit/IT, chaos tests; docs **Epic: Optional Backends** 1. Redis backend (broker + server), with timeouts and circuit breaker 2. RocksDB backend (server) ## 16. Appendix A: Pseudocode ``` // Broker Optional<BrokerResponseNative> cached = brokerCache.get(key); if (cached.isPresent()) return cached.get(); BrokerResponseNative resp = executeDownstream(query); if (isCacheable(resp, query)) brokerCache.put(key, resp, serializedSize(resp)); return resp; // Server (per-segment) SegmentCacheKey key = composeKey(segment, planSig, opts, ...); SegmentCacheValue val = segmentCache.get(key); if (val != null) return val; // short-circuit SegmentCacheValue computed = runOperators(segment, plan); if (isCacheable(computed)) segmentCache.put(key, computed, size(computed)); return computed; ``` ## 17. Appendix B: Industry Comparison (Condensed) - Druid: Broker result cache + historical segment cache; keys include query + segment version; invalidation on version bump. - ClickHouse: Final result cache keyed by AST + part versions; heavy reliance on block/index caches; no per-part result cache. - StarRocks: BE per-tablet plan-signature cache; strong versioning; partial reuse supported. ## 18. Assets - This RFC Doc: https://docs.google.com/document/d/1qUjLEJnhODD3oJrOP_Q15JP-Tb4RkLLNyDHYaWcztMg/edit?usp=sharing - Design Doc: Need to add later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
