kosiew opened a new issue, #21910:
URL: https://github.com/apache/datafusion/issues/21910

   ## Summary
   
   DataFusion execution plans expose a declared output schema, but some 
operators can still emit `RecordBatch` values whose physical batch schema 
drifts from that declared schema. PR #21770 fixes one concrete instance in 
`RecursiveQueryExec`, where recursive-term batches preserved their native field 
names instead of the anchor term's plan-declared names.
   
   That PR appropriately uses `RecordBatch::try_new(...)` to rebuild the batch 
with the plan-declared schema, because this path needs to normalize field names 
and Arrow's `RecordBatch::with_schema(...)` enforces compatibility / 
containment checks that can reject that kind of rename.
   
   This issue proposes introducing a small reusable execution-layer helper for 
this class of schema normalization so operators can express the intent clearly 
and handle similar cases consistently.
   
   ## Motivation
   
   The recursive CTE bug shows a recurring execution-layer risk:
   
   - the logical / plan-level schema is correct
   - runtime batches coming from an independently planned branch carry 
different field names
   - downstream consumers observe the batch-local schema rather than the plan 
contract
   
   That matters for user-visible behavior because some downstream code keys 
directly on `batch.schema().field(i).name()`, including:
   
   - sort / TopK-style paths that preserve incoming batches
   - CSV / JSON writers
   - custom collectors or adapters that inspect emitted batches directly
   
   Right now, when an execution node needs to normalize emitted batches back to 
its declared output schema, the logic is local and ad hoc. That makes it harder 
to:
   
   - spot where normalization is required
   - document when the operation is a simple metadata rebind versus a schema 
rename
   - audit similar operators in the future
   
   ## Why this should not just use `RecordBatch::with_schema(...)`
   
   It is tempting to standardize on `RecordBatch::with_schema(...)`, but that 
API is not sufficient for all relevant cases.
   
   In particular:
   
   - `with_schema(...)` is ideal when the target schema is Arrow-compatible 
with the current batch under containment checks
   - it is not ideal when the operator intentionally needs to normalize field 
names to the plan-declared schema
   - the recursive CTE case in PR #21770 is exactly such a rename-driven 
normalization
   
   That means any shared helper must be explicit about which normalization 
modes it supports. It should not accidentally hide the distinction between:
   
   - compatible schema rebinding
   - field-name normalization
   
   ## Problem Statement
   
   We do not currently have a reusable execution-layer abstraction for:
   
   - "this operator promises to emit batches matching its declared schema"
   - "these child-produced batches need to be normalized before they leave this 
operator"
   - "this normalization is allowed to rename fields, not just attach 
equivalent metadata"
   
   As a result:
   
   - operators implement the logic inline
   - the rationale is easy to lose in future edits
   - reviewers must re-derive the same invariants at each call site
   - follow-up auditing for similar bugs is harder than it needs to be
   
   ## Proposed Direction
   
   Introduce a small helper in `datafusion/physical-plan` or a nearby 
execution-focused module that makes schema normalization explicit.
   
   Possible API shapes:
   
   - `normalize_batch_schema(batch, expected_schema)`
   - `rebind_batch_to_output_schema(batch, expected_schema)`
   - a tiny helper type or utility module with focused functions
   
   The implementation should be narrow and explicit. A reasonable first cut 
could support:
   
   1. fast path: return the batch unchanged when the schema already matches
   2. compatible rebind path: use an Arrow-backed check when simple rebinding 
is valid
   3. rename normalization path: rebuild the batch with the declared schema 
when name normalization is intentionally required
   
   If those paths need distinct semantics, it may be better to expose them as 
separate helpers rather than one overly-smart abstraction.
   
   ## Scope
   
   In scope:
   
   - define a reusable helper or helper pair for execution-layer schema 
normalization
   - migrate `RecursiveQueryExec` to use the shared utility if that improves 
clarity
   - document why name-normalizing call sites cannot always use 
`with_schema(...)`
   - audit a small number of nearby operators that combine or forward batches 
from multiple branches
   
   Out of scope:
   
   - a large refactor of every `RecordBatch` construction site in the workspace
   - changing Arrow's schema compatibility semantics
   - redesigning logical planning or schema coercion rules
   
   ## Design Considerations
   
   ### 1. Keep the helper honest about semantics
   
   The key design trap is over-generalization. A helper that silently mixes 
"equivalent schema rebind" and "intentional field rename normalization" could 
make the code less clear, not more.
   
   The implementation should make the contract obvious at the call site.
   
   ### 2. Preserve zero-copy behavior where possible
   
   When simple schema rebinding is valid, the helper should avoid data copies. 
But the API should not pretend every normalization path is identical if some 
cases require reconstructing the `RecordBatch`.
   
   ### 3. Make review easier
   
   The value here is not just fewer lines of code. It is making the invariant 
visible:
   
   - this operator may receive structurally-compatible batches
   - before emission, those batches must match the operator's declared output 
schema
   
   ### 4. Avoid premature broadening
   
   The payoff here appears moderate unless additional call sites emerge. That 
argues for a focused, low-complexity helper rather than a sweeping abstraction.
   
   ## Candidate Audit Targets
   
   This issue is most relevant for operators that:
   
   - merge results from independently planned branches
   - replay or buffer child batches and emit them later
   - forward batches without rebuilding them
   - rely on plan-level coercion while preserving child-produced batch metadata
   
   Examples worth auditing:
   
   - recursive execution paths
   - union-like or branch-combining execution nodes
   - interleave / merge utilities
   - serializer-facing boundaries where batch-local schemas matter
   
   ## Testing Plan
   
   Add focused tests that cover:
   
   - no-op behavior when the batch schema already matches the expected output 
schema
   - successful normalization when field names need to be rewritten to the 
declared schema
   - clear failure behavior when the requested normalization would violate 
column/type compatibility
   - at least one execution-level regression demonstrating that emitted batches 
match the operator's declared schema
   
   ## Expected Benefits
   
   - clearer execution-layer contracts around emitted batch schemas
   - less ad hoc schema-rebind logic at individual call sites
   - easier future auditing for similar bugs
   - more durable documentation of when Arrow's `with_schema(...)` is and is 
not the right tool
   
   ## Acceptance Criteria
   
   - a reusable schema-normalization helper or helper pair exists in the 
execution layer
   - the helper documents the distinction between simple schema rebinding and 
field-name normalization
   - `RecursiveQueryExec` either uses the helper or remains a 
clearly-documented intentional exception
   - focused tests cover the supported normalization behavior
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to