gortiz commented on PR #18262:
URL: https://github.com/apache/pinot/pull/18262#issuecomment-4386432835

   # Review: PR #18262 — [MSE] Introduce Apache Arrow as a columnar block 
format for MseBlock.Data
   
   **Repo:** apache/pinot
   **Branch:** pchaganl/arrow-block-representation → master
   **Author:** praveenc7 (Praveen)
   **Date:** 2026-04-20
   
   **Repo context loaded:**
   - [x] CLAUDE.md read
   - [x] CODEOWNERS read (not present in repo; noted as observation)
   - [x] Module-level docs for touched paths read
   
   ---
   
   ## Stage 1: Triage & Evidence Collection
   
   ### PR description
   
   - [x] Description present
   - [x] States what problem is being solved
   - [x] States why this approach
   - [x] States risky areas
   
   **Findings:** The description is thorough — it includes an ASCII diagram 
showing the four-piece stack, one-line summaries for each layer, and an 
explicit list of what is NOT wired yet (no operator produces ArrowBlocks, wire 
format unchanged). Risky areas (off-heap lifetime, allocator ownership) are 
documented. No issues.
   
   ### Change summary
   
   - [x] Full diff read
   - [x] Summary produced
   - [x] Files listed by module
   
   **Summary:** Introduces four new classes and wires them behind a feature 
flag (`pinot.multistage.engine.use.arrow`, default false). No operator consumes 
or produces `ArrowBlock`s; the PR establishes infrastructure only. All three 
existing `MseBlock.Data.Visitor` implementations (`GrpcSendingMailbox`, 
`BlockSplitter`) add a `visit(ArrowBlock)` stub that throws 
`UnsupportedOperationException`. Tests exercise round-tripping and null 
handling for every supported scalar type.
   
   **Files:**
   
   | Module | Files |
   |---|---|
   | pinot-common | `ArrowDataBlock.java` (new), `DataBlock.java` (ARROW enum 
type), `ColumnarDataBlock.java` (comment fix), `ArrowDataBlockTest.java` (new) |
   | pinot-query-runtime | `ArrowBlock.java` (new), `ArrowBlockConverter.java` 
(new), `ArrowBuffers.java` (new), `MseBlock.java` (asArrow/isArrow), 
`BlockSplitter.java` (stub), `GrpcSendingMailbox.java` (stub), 
`ArrowBlockConverterTest.java` (new), `pom.xml` (arrow-vector dep) |
   | pinot-spi | `CommonConstants.java` (7 new constants) |
   
   ### Risk classification
   
   - [x] Classified as medium
   - [x] Reason stated
   
   **Classification:** Medium
   
   **Reason:** The feature is off by default; no operator path is activated. 
However, changes touch `DataBlock.java` (an SPI used across the codebase), 
`MseBlock.Data.Visitor` (every mailbox and exchange path), and 
`CommonConstants` (configuration contracts). The off-heap allocator adds a new 
failure mode (OOM, leak) invisible to existing monitoring. The 
`DataBlock.Type.ARROW(3)` enum value changes the binary serialization header 
format.
   
   ### Test gaps
   
   - [x] Changed behaviors/contracts checked for corresponding tests
   - [x] Test coverage of new/changed behavior verified
   
   **Gaps:**
   
   1. **Concern — medium confidence.** 
`ArrowBuffers.create(PinotConfiguration)` is not tested anywhere. All tests 
construct `ArrowBuffers` directly via the public constructor. The factory 
method references `CommonConstants.Helix.CONFIG_OF_ARROW_ALLOCATOR_MAX_SIZE` 
and related constants but is never exercised end-to-end. Evidence: 
`ArrowBlockConverterTest:412` uses `new ArrowBuffers(true, new 
RootAllocator(Long.MAX_VALUE), 0, Long.MAX_VALUE)` directly.
   
   2. **Observation — high confidence.** No test verifies that a disabled 
`ArrowBuffers` (the default) correctly refuses `newQueryAllocator` / 
`newAllocator` calls. This is minor since the guard code is simple, but the 
path is not covered.
   
   3. **Observation — high confidence.** No test covers 
`ArrowBlock.asSerialized()` in isolation (only through 
`testRoundTripViaAsRowHeap`). This is adequate given the method is a one-liner.
   
   ---
   
   ## Stage 2: Deep Review
   
   ### Does the implementation address the stated problem?
   
   - [x] Problem statement identified from the PR description
   - [x] Implementation traced through the code to check whether it appears to 
address the stated problem
   - [x] No obvious gaps between what the PR claims to fix and what the code 
actually does
   
   **Findings:** The PR claims to add foundation types only, with no operator 
wiring. This is verified: no existing operator or mailbox receives or produces 
`ArrowBlock`s. The feature flag gates everything. The implementation matches 
the stated goals.
   
   ### High-risk files (medium, high)
   
   - [x] Files ranked by logical risk
   - [x] Risk reasons stated per file
   
   | Rank | File | Risk reason |
   |---|---|---|
   | 1 | `DataBlock.java` | SPI interface change; adds `ARROW(3)` enum and two 
new methods. Downstream serialization and deserialization switch on this enum. |
   | 2 | `MseBlock.java` | Adds `visit(ArrowBlock)` to the `Data.Visitor` 
interface; breaks all existing implementations that don't implement the new 
method — except that GrpcSendingMailbox and BlockSplitter are updated in the 
same PR. |
   | 3 | `ArrowBuffers.java` | Manages off-heap RootAllocator; incorrect config 
constant reference or missing close() call causes off-heap leak or silent 
misconfiguration. |
   | 4 | `ArrowBlockConverter.java` | Data integrity on conversion; null 
handling, type coercion, and resource leak on failure. |
   | 5 | `ArrowBlock.java` | asRowHeap() / asSerialized() fallbacks; incorrect 
type dispatch causes ClassCastException or silent data corruption. |
   
   **Findings:** No critical structural issues found in the high-risk files 
individually. Specific issues are documented below.
   
   ### Center of gravity (medium, high)
   
   - [x] 1-3 key files or decisions identified
   
   **Files:** `ArrowDataBlock.java`, `ArrowBlockConverter.java`, `MseBlock.java`
   
   **Why these are central:** `ArrowDataBlock` is the foundational data-layer 
type — all reads and null semantics flow through it. `ArrowBlockConverter` is 
the only bridge from legacy format to Arrow and determines type correctness and 
null fidelity for every conversion. `MseBlock.java` defines the `Visitor` 
contract that will require updates from every consumer as Arrow adoption grows.
   
   ### Invariants — targeted (medium, high)
   
   - [x] Authorization — N/A, no trust boundary touched
   - [x] Idempotency — N/A, no mutations
   - [x] Backward compatibility — APIs: existing endpoints unchanged; 
`MseBlock.Data.Visitor` gains a new `visit(ArrowBlock)` method. All existing 
non-test implementations in the PR add the method, so compilation is preserved.
   - [x] Backward compatibility — Schemas: `DataBlock.Type.ARROW(3)` added. 
`ZeroCopyDataBlockSerde.deserialize` hits its `default` case for type 3, 
throwing `UnsupportedOperationException`. This is safe because 
`ArrowDataBlock.serialize()` already throws `UnsupportedOperationException`, so 
no ARROW-typed frame can appear on the wire.
   - [x] Backward compatibility — SPI: `DataBlock` interface gains 
`getBigDecimalArray()` and `getBytesArray()` (see blocker below).
   - [x] Backward compatibility — Configuration: 7 new constants added, all 
with safe defaults. Feature off by default.
   - [x] Ordering guarantees — N/A
   - [x] Null/empty handling — Verified: `ArrowDataBlock.getNullRowIds` uses 
`vector.isNull(row)`, correct. `ArrowBlockConverter` passes `nullBitmap` to all 
column writers. All writers skip null rows and call `setNull(row)` on the 
vector. Empty-block case (`numRows == 0`) correctly sets `valueCount(0)` and 
returns.
   - [x] Retry safety — N/A, stateless conversion
   - [x] Observability — No metrics added. Off-heap allocation is invisible to 
existing JVM heap monitoring. Noted as concern.
   - [x] Resource cleanup — `ArrowDataBlock.close()` closes `_root`. 
`ArrowBlock.close()` delegates to `ArrowDataBlock.close()`. 
`ArrowBlockConverter.fromDataBlock()` wraps population in try/catch and closes 
`root` on any exception. `ArrowBuffers` closes its `RootAllocator` on 
`close()`. No obvious leaks in happy path.
   - [x] Migration reversibility — N/A, no schema migration
   - [x] Performance in hot paths — Feature is off by default; zero cost on 
default path. When enabled, `ArrowBlockConverter.toArrowBlock` calls 
`block.asSerialized()` first (an expensive serialization step), then re-reads 
from the serialized form into Arrow vectors. This double-materialization is a 
performance concern for large blocks when the feature is eventually enabled.
   - [x] API design — `ArrowBlock.isSerialized()` is explicitly overridden to 
return `false`. Without the override, the `MseBlock.Data` default (`return 
!isRowHeap()`) would return `true` for `ArrowBlock`, misrouting 
`QueryDispatcher` to `reduceSerialized`. The override is present and correct, 
but the default is semantically broken for any future third block type that 
doesn't remember to override. Noted as concern.
   
   **Findings:**
   
   - **Concern — high confidence.** `ArrowBlock.close()` is not 
reference-counted and the Javadoc warns that calling it twice is undefined. If 
a caller holds both an `ArrowBlock` and the underlying `ArrowDataBlock` and 
closes both, `VectorSchemaRoot.close()` is called twice. Arrow's 
`VectorSchemaRoot` does have a safety check on double-close, so this would 
throw an exception rather than corrupt memory, but the contract is brittle. 
Evidence: `ArrowBlock.java:168`, `ArrowDataBlock.java:252`.
   
   - **Observation — medium confidence.** The `MseBlock.Data.isSerialized()` 
default (`return !isRowHeap()`) is now semantically incorrect for `ArrowBlock` 
without explicit override. The override is present, but any future 
`MseBlock.Data` implementor that forgets to override it would silently misroute 
in `QueryDispatcher`. Consider changing the default to `return false` or 
documenting the invariant explicitly. Evidence: `MseBlock.java:102-104`.
   
   - **Observation — medium confidence.** No metrics or log events are emitted 
when Arrow allocation fails or when the Arrow path is entered. For debugging a 
production incident with OOM errors in off-heap memory, there's no signal to 
distinguish Arrow allocation failures from regular heap OOM. This should be 
addressed before Arrow is enabled in production.
   
   ### Invariants — full sweep (high only)
   
   - [x] N/A — skipped (medium risk)
   
   ### Test adequacy (medium, high)
   
   - [x] Tests prove intended behavior (not just that code runs)
   - [x] Failure modes covered
   - [x] Boundary conditions covered
   - [x] Not over-coupled to implementation details
   
   **Findings:** Test quality is high. 
`ArrowDataBlockTest.testAllScalarTypesHappyAndNullPath` covers every supported 
scalar type with a non-null/null/non-null row pattern and verifies the null 
bitmap for each column independently. 
`ArrowBlockConverterTest.testConverterRoundTripAllTypes` does the same through 
the full conversion path and adds a regression guard that `DataSchema` column 
types survive the Arrow round-trip (preventing TIMESTAMP from collapsing to 
LONG, etc.). `testCloseReleasesOffHeapMemory` verifies the allocator reports 
zero after close, which is a strong invariant for off-heap tracking.
   
   **Gap:** No test verifies what happens when the `RootAllocator` runs out of 
budget mid-conversion (off-heap OOM). This is acceptable for a foundation PR 
but should be noted for future hardening.
   
   ### Collateral damage (medium, high)
   
   - [x] No copy-paste duplication introduced
   - [x] No partial renames (all references updated)
   - [x] No dead code left behind
   - [x] Docs and config match code changes
   - [x] No one-off exceptions added to shared abstractions
   
   **Findings:**
   
   - **Observation — high confidence.** The `ColumnarDataBlock.java` change 
(moving a misindented comment) is a drive-by fix with no functional effect. 
Fine.
   
   - **Observation — medium confidence.** `ArrowBuffers.create()` reads 
`CONFIG_OF_ARROW_ALLOCATOR_MAX_SIZE` from `CommonConstants.Helix`, while 
`CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED` and the new 
`CONFIG_OF_MULTI_STAGE_ENGINE_USE_ARROW` are also in `CommonConstants.Helix` 
(confirmed from the diff hunk at `@@ -266` which is after the `Instance` inner 
class closes). The placement is internally consistent. No issue.
   
   ### Path-triggered review (medium, high; also low if boundary paths touched)
   
   - [x] Touched paths listed
   - [x] Failure mode inferred per path
   - [x] Reviewed for inferred risks
   - [x] Boundary areas checked first
   
   **Touched paths and inferred risks:**
   
   | Path | Inferred failure mode | What was checked | Findings |
   |---|---|---|---|
   | 
`pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java` | 
SPI change breaks implementors | Checked all `DataBlock` implementors for the 
new interface methods | **BLOCKER** — `ArrowDataBlock` is missing 
`getBigDecimalArray()` and `getBytesArray()` (see blocker section) |
   | `pinot-query-runtime/src/main/java/.../blocks/MseBlock.java` | Visitor 
extension breaks all implementors | Checked `GrpcSendingMailbox` and 
`BlockSplitter` for new `visit(ArrowBlock)` | Both updated in this PR; no other 
visitor implementations found in production code |
   | `pinot-query-runtime/src/main/java/.../memory/ArrowBuffers.java` | 
Off-heap OOM, leak on shutdown | Checked `close()`, `checkEnabled()`, 
disabled-mode RootAllocator creation | Disabled mode correctly creates a 
zero-sized allocator; close() unconditional and correct |
   | `pinot-spi/src/main/java/.../spi/utils/CommonConstants.java` | Config 
namespace collision | Checked constant placement and ArrowBuffers references | 
No collision; all constants in `Helix` class, consistent with `ArrowBuffers` 
usage |
   | `pinot-query-runtime/pom.xml` | New transitive dependency breaks 
dependency graph | Checked `arrow-vector` and `arrow-memory-netty` scope | 
`arrow-vector` compile scope (correct), `arrow-memory-netty` runtime scope 
(correct) |
   
   **Findings:** See blocker below.
   
   ---
   
   ## Verdict
   
   **Risk:** Medium
   **Verdict:** changes-requested
   **Verdict confidence:** High
   
   ### Confidence reasoning
   
   The diff is clean and well-documented. All CI checks pass against the PR's 
base commit. The blocker is a post-PR interface addition (PR #18326 was merged 
5 days after this PR was opened, adding `getBigDecimalArray()` and 
`getBytesArray()` to the `DataBlock` interface). This is a factual, traceable 
finding with a specific fix. The other findings are concerns and observations 
that do not block correctness of the infrastructure at its current 
(feature-off) state, but should be addressed before enabling Arrow in 
production.
   
   ### Blockers
   
   1. **BLOCKER — high confidence.** `ArrowDataBlock` does not implement 
`getBigDecimalArray(int rowId, int colId)` or `getBytesArray(int rowId, int 
colId)`, both of which are required by the `DataBlock` interface. These methods 
were added to `DataBlock` by PR #18326 (commit `aad170afaa`, merged 
2026-04-25), which post-dates this PR's creation (2026-04-20). The PR must be 
rebased or the methods must be added (both should throw 
`UnsupportedOperationException`, consistent with the other unsupported array 
methods in `ArrowDataBlock`). Evidence: `DataBlock.java:86,90`; 
`ArrowDataBlock.java` (absent implementation). Severity: blocker. Confidence: 
high.
   
   ### Concerns
   
   1. **Concern — medium confidence.** 
`ArrowBuffers.create(PinotConfiguration)` is the intended production entry 
point for initializing the allocator, but no test calls it. The only tests use 
the direct constructor. If a config key name drifts or the `PinotConfiguration` 
lookup semantics change, `create()` will silently misconfigure without any test 
failing. Evidence: `ArrowBuffers.java:257-270`, 
`ArrowBlockConverterTest.java:412`. Severity: concern.
   
   2. **Concern — medium confidence.** Off-heap allocations by `ArrowBuffers` 
are invisible to existing JVM heap monitoring and metrics. When Arrow is 
eventually enabled, off-heap OOM will produce `OutOfMemoryError` with no 
Pinot-level metric to distinguish it from heap OOM. Before enabling in 
production, emit a metric (e.g., `arrowAllocatedBytes`) via 
`getAllocatedMemory()`. Evidence: `ArrowBuffers.java:313-316`. Severity: 
concern.
   
   3. **Concern — medium confidence.** `ArrowBlock.close()` is not idempotent 
and not reference-counted. The Javadoc states "calling it twice will attempt to 
close the underlying `ArrowDataBlock` twice." Arrow's `VectorSchemaRoot` closes 
gracefully on a second call (it is a no-op after the first close), but the 
contract is brittle for future consumers who hold both references. Consider 
adding a `_closed` guard. Evidence: `ArrowBlock.java:165-168`. Severity: 
concern.
   
   4. **Concern — low confidence.** `ArrowBlockConverter.toArrowBlock()` always 
routes through `block.asSerialized()` for non-Arrow inputs, including 
`RowHeapDataBlock`. This means every conversion suffers a full 
heap→serialized→Arrow materialization. For the foundation PR this is 
acceptable, but the path should be revisited before Arrow is used in a hot 
query path. Evidence: `ArrowBlockConverter.java:101-108`. Severity: concern.
   
   ### Observations
   
   1. The `MseBlock.Data.isSerialized()` default (`return !isRowHeap()`) is a 
semantic time bomb for future block types. Any new `MseBlock.Data` implementor 
that forgets to override `isSerialized()` will incorrectly report itself as 
serialized, misrouting in `QueryDispatcher`. Consider changing the default to 
`return false`. Evidence: `MseBlock.java:102-104`.
   
   2. `ArrowBlock.asRowHeap()` and `asSerialized()` are marked with `TODO: 
remove this method`. These fallbacks exist for operator compatibility but add 
code that will need cleanup. The TODO is appropriate as-is given the staged 
rollout plan.
   
   3. `ColumnarDataBlock.java` has a drive-by fix for a misindented comment. 
Harmless.
   
   4. CODEOWNERS file was not found in this repository. Reviews are unassigned; 
human auditors should route this to the MSE/query-runtime team.
   
   ### Areas recommended for human focus
   
   1. **The rebase gap**: Confirm that `ArrowDataBlock` is updated to add 
`getBigDecimalArray()` and `getBytesArray()` throwing 
`UnsupportedOperationException`, and re-run the unit tests after rebasing on 
master.
   
   2. **ArrowBuffers.create() lifecycle**: Verify that production component 
startup code (ServerQueryExecutorRunnerFactory or equivalent) will call 
`ArrowBuffers.create()` and inject the result correctly before this feature is 
enabled. This path is completely untested.
   
   3. **Double-close semantics**: Validate that `ArrowDataBlock.close()` being 
called twice is safe in Arrow 19.0.0 (the version in use). If it is not a 
no-op, add a `_closed` guard.
   
   ### Status
   
   - [x] All applicable Stage 1 boxes checked
   - [x] All applicable Stage 2 boxes checked (or N/A for skipped tiers)
   - [x] Verdict committed
   - [x] Verdict confidence assigned with reasoning
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to