gortiz commented on PR #18262: URL: https://github.com/apache/pinot/pull/18262#issuecomment-4386432835
# Review: PR #18262 — [MSE] Introduce Apache Arrow as a columnar block format for MseBlock.Data **Repo:** apache/pinot **Branch:** pchaganl/arrow-block-representation → master **Author:** praveenc7 (Praveen) **Date:** 2026-04-20 **Repo context loaded:** - [x] CLAUDE.md read - [x] CODEOWNERS read (not present in repo; noted as observation) - [x] Module-level docs for touched paths read --- ## Stage 1: Triage & Evidence Collection ### PR description - [x] Description present - [x] States what problem is being solved - [x] States why this approach - [x] States risky areas **Findings:** The description is thorough — it includes an ASCII diagram showing the four-piece stack, one-line summaries for each layer, and an explicit list of what is NOT wired yet (no operator produces ArrowBlocks, wire format unchanged). Risky areas (off-heap lifetime, allocator ownership) are documented. No issues. ### Change summary - [x] Full diff read - [x] Summary produced - [x] Files listed by module **Summary:** Introduces four new classes and wires them behind a feature flag (`pinot.multistage.engine.use.arrow`, default false). No operator consumes or produces `ArrowBlock`s; the PR establishes infrastructure only. All three existing `MseBlock.Data.Visitor` implementations (`GrpcSendingMailbox`, `BlockSplitter`) add a `visit(ArrowBlock)` stub that throws `UnsupportedOperationException`. Tests exercise round-tripping and null handling for every supported scalar type. **Files:** | Module | Files | |---|---| | pinot-common | `ArrowDataBlock.java` (new), `DataBlock.java` (ARROW enum type), `ColumnarDataBlock.java` (comment fix), `ArrowDataBlockTest.java` (new) | | pinot-query-runtime | `ArrowBlock.java` (new), `ArrowBlockConverter.java` (new), `ArrowBuffers.java` (new), `MseBlock.java` (asArrow/isArrow), `BlockSplitter.java` (stub), `GrpcSendingMailbox.java` (stub), `ArrowBlockConverterTest.java` (new), `pom.xml` (arrow-vector dep) | | pinot-spi | `CommonConstants.java` (7 new constants) | ### Risk classification - [x] Classified as medium - [x] Reason stated **Classification:** Medium **Reason:** The feature is off by default; no operator path is activated. However, changes touch `DataBlock.java` (an SPI used across the codebase), `MseBlock.Data.Visitor` (every mailbox and exchange path), and `CommonConstants` (configuration contracts). The off-heap allocator adds a new failure mode (OOM, leak) invisible to existing monitoring. The `DataBlock.Type.ARROW(3)` enum value changes the binary serialization header format. ### Test gaps - [x] Changed behaviors/contracts checked for corresponding tests - [x] Test coverage of new/changed behavior verified **Gaps:** 1. **Concern — medium confidence.** `ArrowBuffers.create(PinotConfiguration)` is not tested anywhere. All tests construct `ArrowBuffers` directly via the public constructor. The factory method references `CommonConstants.Helix.CONFIG_OF_ARROW_ALLOCATOR_MAX_SIZE` and related constants but is never exercised end-to-end. Evidence: `ArrowBlockConverterTest:412` uses `new ArrowBuffers(true, new RootAllocator(Long.MAX_VALUE), 0, Long.MAX_VALUE)` directly. 2. **Observation — high confidence.** No test verifies that a disabled `ArrowBuffers` (the default) correctly refuses `newQueryAllocator` / `newAllocator` calls. This is minor since the guard code is simple, but the path is not covered. 3. **Observation — high confidence.** No test covers `ArrowBlock.asSerialized()` in isolation (only through `testRoundTripViaAsRowHeap`). This is adequate given the method is a one-liner. --- ## Stage 2: Deep Review ### Does the implementation address the stated problem? - [x] Problem statement identified from the PR description - [x] Implementation traced through the code to check whether it appears to address the stated problem - [x] No obvious gaps between what the PR claims to fix and what the code actually does **Findings:** The PR claims to add foundation types only, with no operator wiring. This is verified: no existing operator or mailbox receives or produces `ArrowBlock`s. The feature flag gates everything. The implementation matches the stated goals. ### High-risk files (medium, high) - [x] Files ranked by logical risk - [x] Risk reasons stated per file | Rank | File | Risk reason | |---|---|---| | 1 | `DataBlock.java` | SPI interface change; adds `ARROW(3)` enum and two new methods. Downstream serialization and deserialization switch on this enum. | | 2 | `MseBlock.java` | Adds `visit(ArrowBlock)` to the `Data.Visitor` interface; breaks all existing implementations that don't implement the new method — except that GrpcSendingMailbox and BlockSplitter are updated in the same PR. | | 3 | `ArrowBuffers.java` | Manages off-heap RootAllocator; incorrect config constant reference or missing close() call causes off-heap leak or silent misconfiguration. | | 4 | `ArrowBlockConverter.java` | Data integrity on conversion; null handling, type coercion, and resource leak on failure. | | 5 | `ArrowBlock.java` | asRowHeap() / asSerialized() fallbacks; incorrect type dispatch causes ClassCastException or silent data corruption. | **Findings:** No critical structural issues found in the high-risk files individually. Specific issues are documented below. ### Center of gravity (medium, high) - [x] 1-3 key files or decisions identified **Files:** `ArrowDataBlock.java`, `ArrowBlockConverter.java`, `MseBlock.java` **Why these are central:** `ArrowDataBlock` is the foundational data-layer type — all reads and null semantics flow through it. `ArrowBlockConverter` is the only bridge from legacy format to Arrow and determines type correctness and null fidelity for every conversion. `MseBlock.java` defines the `Visitor` contract that will require updates from every consumer as Arrow adoption grows. ### Invariants — targeted (medium, high) - [x] Authorization — N/A, no trust boundary touched - [x] Idempotency — N/A, no mutations - [x] Backward compatibility — APIs: existing endpoints unchanged; `MseBlock.Data.Visitor` gains a new `visit(ArrowBlock)` method. All existing non-test implementations in the PR add the method, so compilation is preserved. - [x] Backward compatibility — Schemas: `DataBlock.Type.ARROW(3)` added. `ZeroCopyDataBlockSerde.deserialize` hits its `default` case for type 3, throwing `UnsupportedOperationException`. This is safe because `ArrowDataBlock.serialize()` already throws `UnsupportedOperationException`, so no ARROW-typed frame can appear on the wire. - [x] Backward compatibility — SPI: `DataBlock` interface gains `getBigDecimalArray()` and `getBytesArray()` (see blocker below). - [x] Backward compatibility — Configuration: 7 new constants added, all with safe defaults. Feature off by default. - [x] Ordering guarantees — N/A - [x] Null/empty handling — Verified: `ArrowDataBlock.getNullRowIds` uses `vector.isNull(row)`, correct. `ArrowBlockConverter` passes `nullBitmap` to all column writers. All writers skip null rows and call `setNull(row)` on the vector. Empty-block case (`numRows == 0`) correctly sets `valueCount(0)` and returns. - [x] Retry safety — N/A, stateless conversion - [x] Observability — No metrics added. Off-heap allocation is invisible to existing JVM heap monitoring. Noted as concern. - [x] Resource cleanup — `ArrowDataBlock.close()` closes `_root`. `ArrowBlock.close()` delegates to `ArrowDataBlock.close()`. `ArrowBlockConverter.fromDataBlock()` wraps population in try/catch and closes `root` on any exception. `ArrowBuffers` closes its `RootAllocator` on `close()`. No obvious leaks in happy path. - [x] Migration reversibility — N/A, no schema migration - [x] Performance in hot paths — Feature is off by default; zero cost on default path. When enabled, `ArrowBlockConverter.toArrowBlock` calls `block.asSerialized()` first (an expensive serialization step), then re-reads from the serialized form into Arrow vectors. This double-materialization is a performance concern for large blocks when the feature is eventually enabled. - [x] API design — `ArrowBlock.isSerialized()` is explicitly overridden to return `false`. Without the override, the `MseBlock.Data` default (`return !isRowHeap()`) would return `true` for `ArrowBlock`, misrouting `QueryDispatcher` to `reduceSerialized`. The override is present and correct, but the default is semantically broken for any future third block type that doesn't remember to override. Noted as concern. **Findings:** - **Concern — high confidence.** `ArrowBlock.close()` is not reference-counted and the Javadoc warns that calling it twice is undefined. If a caller holds both an `ArrowBlock` and the underlying `ArrowDataBlock` and closes both, `VectorSchemaRoot.close()` is called twice. Arrow's `VectorSchemaRoot` does have a safety check on double-close, so this would throw an exception rather than corrupt memory, but the contract is brittle. Evidence: `ArrowBlock.java:168`, `ArrowDataBlock.java:252`. - **Observation — medium confidence.** The `MseBlock.Data.isSerialized()` default (`return !isRowHeap()`) is now semantically incorrect for `ArrowBlock` without explicit override. The override is present, but any future `MseBlock.Data` implementor that forgets to override it would silently misroute in `QueryDispatcher`. Consider changing the default to `return false` or documenting the invariant explicitly. Evidence: `MseBlock.java:102-104`. - **Observation — medium confidence.** No metrics or log events are emitted when Arrow allocation fails or when the Arrow path is entered. For debugging a production incident with OOM errors in off-heap memory, there's no signal to distinguish Arrow allocation failures from regular heap OOM. This should be addressed before Arrow is enabled in production. ### Invariants — full sweep (high only) - [x] N/A — skipped (medium risk) ### Test adequacy (medium, high) - [x] Tests prove intended behavior (not just that code runs) - [x] Failure modes covered - [x] Boundary conditions covered - [x] Not over-coupled to implementation details **Findings:** Test quality is high. `ArrowDataBlockTest.testAllScalarTypesHappyAndNullPath` covers every supported scalar type with a non-null/null/non-null row pattern and verifies the null bitmap for each column independently. `ArrowBlockConverterTest.testConverterRoundTripAllTypes` does the same through the full conversion path and adds a regression guard that `DataSchema` column types survive the Arrow round-trip (preventing TIMESTAMP from collapsing to LONG, etc.). `testCloseReleasesOffHeapMemory` verifies the allocator reports zero after close, which is a strong invariant for off-heap tracking. **Gap:** No test verifies what happens when the `RootAllocator` runs out of budget mid-conversion (off-heap OOM). This is acceptable for a foundation PR but should be noted for future hardening. ### Collateral damage (medium, high) - [x] No copy-paste duplication introduced - [x] No partial renames (all references updated) - [x] No dead code left behind - [x] Docs and config match code changes - [x] No one-off exceptions added to shared abstractions **Findings:** - **Observation — high confidence.** The `ColumnarDataBlock.java` change (moving a misindented comment) is a drive-by fix with no functional effect. Fine. - **Observation — medium confidence.** `ArrowBuffers.create()` reads `CONFIG_OF_ARROW_ALLOCATOR_MAX_SIZE` from `CommonConstants.Helix`, while `CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED` and the new `CONFIG_OF_MULTI_STAGE_ENGINE_USE_ARROW` are also in `CommonConstants.Helix` (confirmed from the diff hunk at `@@ -266` which is after the `Instance` inner class closes). The placement is internally consistent. No issue. ### Path-triggered review (medium, high; also low if boundary paths touched) - [x] Touched paths listed - [x] Failure mode inferred per path - [x] Reviewed for inferred risks - [x] Boundary areas checked first **Touched paths and inferred risks:** | Path | Inferred failure mode | What was checked | Findings | |---|---|---|---| | `pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java` | SPI change breaks implementors | Checked all `DataBlock` implementors for the new interface methods | **BLOCKER** — `ArrowDataBlock` is missing `getBigDecimalArray()` and `getBytesArray()` (see blocker section) | | `pinot-query-runtime/src/main/java/.../blocks/MseBlock.java` | Visitor extension breaks all implementors | Checked `GrpcSendingMailbox` and `BlockSplitter` for new `visit(ArrowBlock)` | Both updated in this PR; no other visitor implementations found in production code | | `pinot-query-runtime/src/main/java/.../memory/ArrowBuffers.java` | Off-heap OOM, leak on shutdown | Checked `close()`, `checkEnabled()`, disabled-mode RootAllocator creation | Disabled mode correctly creates a zero-sized allocator; close() unconditional and correct | | `pinot-spi/src/main/java/.../spi/utils/CommonConstants.java` | Config namespace collision | Checked constant placement and ArrowBuffers references | No collision; all constants in `Helix` class, consistent with `ArrowBuffers` usage | | `pinot-query-runtime/pom.xml` | New transitive dependency breaks dependency graph | Checked `arrow-vector` and `arrow-memory-netty` scope | `arrow-vector` compile scope (correct), `arrow-memory-netty` runtime scope (correct) | **Findings:** See blocker below. --- ## Verdict **Risk:** Medium **Verdict:** changes-requested **Verdict confidence:** High ### Confidence reasoning The diff is clean and well-documented. All CI checks pass against the PR's base commit. The blocker is a post-PR interface addition (PR #18326 was merged 5 days after this PR was opened, adding `getBigDecimalArray()` and `getBytesArray()` to the `DataBlock` interface). This is a factual, traceable finding with a specific fix. The other findings are concerns and observations that do not block correctness of the infrastructure at its current (feature-off) state, but should be addressed before enabling Arrow in production. ### Blockers 1. **BLOCKER — high confidence.** `ArrowDataBlock` does not implement `getBigDecimalArray(int rowId, int colId)` or `getBytesArray(int rowId, int colId)`, both of which are required by the `DataBlock` interface. These methods were added to `DataBlock` by PR #18326 (commit `aad170afaa`, merged 2026-04-25), which post-dates this PR's creation (2026-04-20). The PR must be rebased or the methods must be added (both should throw `UnsupportedOperationException`, consistent with the other unsupported array methods in `ArrowDataBlock`). Evidence: `DataBlock.java:86,90`; `ArrowDataBlock.java` (absent implementation). Severity: blocker. Confidence: high. ### Concerns 1. **Concern — medium confidence.** `ArrowBuffers.create(PinotConfiguration)` is the intended production entry point for initializing the allocator, but no test calls it. The only tests use the direct constructor. If a config key name drifts or the `PinotConfiguration` lookup semantics change, `create()` will silently misconfigure without any test failing. Evidence: `ArrowBuffers.java:257-270`, `ArrowBlockConverterTest.java:412`. Severity: concern. 2. **Concern — medium confidence.** Off-heap allocations by `ArrowBuffers` are invisible to existing JVM heap monitoring and metrics. When Arrow is eventually enabled, off-heap OOM will produce `OutOfMemoryError` with no Pinot-level metric to distinguish it from heap OOM. Before enabling in production, emit a metric (e.g., `arrowAllocatedBytes`) via `getAllocatedMemory()`. Evidence: `ArrowBuffers.java:313-316`. Severity: concern. 3. **Concern — medium confidence.** `ArrowBlock.close()` is not idempotent and not reference-counted. The Javadoc states "calling it twice will attempt to close the underlying `ArrowDataBlock` twice." Arrow's `VectorSchemaRoot` closes gracefully on a second call (it is a no-op after the first close), but the contract is brittle for future consumers who hold both references. Consider adding a `_closed` guard. Evidence: `ArrowBlock.java:165-168`. Severity: concern. 4. **Concern — low confidence.** `ArrowBlockConverter.toArrowBlock()` always routes through `block.asSerialized()` for non-Arrow inputs, including `RowHeapDataBlock`. This means every conversion suffers a full heap→serialized→Arrow materialization. For the foundation PR this is acceptable, but the path should be revisited before Arrow is used in a hot query path. Evidence: `ArrowBlockConverter.java:101-108`. Severity: concern. ### Observations 1. The `MseBlock.Data.isSerialized()` default (`return !isRowHeap()`) is a semantic time bomb for future block types. Any new `MseBlock.Data` implementor that forgets to override `isSerialized()` will incorrectly report itself as serialized, misrouting in `QueryDispatcher`. Consider changing the default to `return false`. Evidence: `MseBlock.java:102-104`. 2. `ArrowBlock.asRowHeap()` and `asSerialized()` are marked with `TODO: remove this method`. These fallbacks exist for operator compatibility but add code that will need cleanup. The TODO is appropriate as-is given the staged rollout plan. 3. `ColumnarDataBlock.java` has a drive-by fix for a misindented comment. Harmless. 4. CODEOWNERS file was not found in this repository. Reviews are unassigned; human auditors should route this to the MSE/query-runtime team. ### Areas recommended for human focus 1. **The rebase gap**: Confirm that `ArrowDataBlock` is updated to add `getBigDecimalArray()` and `getBytesArray()` throwing `UnsupportedOperationException`, and re-run the unit tests after rebasing on master. 2. **ArrowBuffers.create() lifecycle**: Verify that production component startup code (ServerQueryExecutorRunnerFactory or equivalent) will call `ArrowBuffers.create()` and inject the result correctly before this feature is enabled. This path is completely untested. 3. **Double-close semantics**: Validate that `ArrowDataBlock.close()` being called twice is safe in Arrow 19.0.0 (the version in use). If it is not a no-op, add a `_closed` guard. ### Status - [x] All applicable Stage 1 boxes checked - [x] All applicable Stage 2 boxes checked (or N/A for skipped tiers) - [x] Verdict committed - [x] Verdict confidence assigned with reasoning -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
