mbutrovich opened a new pull request, #4572:
URL: https://github.com/apache/datafusion-comet/pull/4572

   ## Which issue does this PR close?
   
   Closes #3770.
   
   > Note: peeled off the draft experimental PR #4393 (which is not intended to 
merge), and for now also carries the commits from #4507 (native shuffle 
optimizations), so the diff is temporarily inflated and will shrink once #4507 
merges. The description below covers only this PR's own scope (the Arrow C 
Stream Interface input path), not the #4507 native shuffle work.
   
   ## Rationale for this change
   
   - The JVM-to-native input path used a bespoke `CometBatchIterator` plus a 
per-batch FFI deep copy, guarded by an `arrow_ffi_safe` flag, because the JVM 
could reuse or mutate a batch's buffers after handing it off. Every batch 
crossing the boundary was copied.
   - The Arrow C Stream Interface is the canonical, zero-copy way to hand an 
Arrow stream across FFI with proper ownership transfer, so both the deep copy 
and the flag become unnecessary.
   
   ## What changes are included in this PR?
   
   - JVM exports each per-partition `Iterator[ColumnarBatch]` as an 
`org.apache.arrow.c.ArrowArrayStream` (`Data.exportArrayStream`); native takes 
ownership via `from_raw`. `CometBatchIterator.java` and the `arrow_ffi_safe` 
proto field/plumbing are removed.
   - `CometExecIterator` / `CometExecRDD` now pass an `Array[Object]` of 
already-exported `ArrowArrayStream` (or `CometShuffleBlockIterator`) slots 
instead of `CometBatchIterator`.
   - New `ArrowReader` implementations bridging Spark data to Arrow: 
`RowArrowReader` (`InternalRow`), `SparkColumnarArrowReader` (non-Arrow Spark 
`ColumnarBatch`), `ColumnarBatchArrowReader` (Arrow-backed `ColumnarBatch`, 
with VSR ownership transfer).
   - New `CometNativeArrowSource` trait: an operator supplies one per-partition 
reader and gets both the JVM columnar path (`doExecuteColumnar`) and the native 
C Stream path (`doExecuteAsArrowStream`). Implemented by 
`CometLocalTableScanExec` and `CometSparkToColumnarExec`.
   - Native `AlignedArrowStreamReader` wraps arrow-rs's stream reader to align 
buffers per imported batch (the JVM exports 8-byte-aligned buffers, which trip 
arrow-rs's alignment assertion). This is a temporary workaround: upstream 
apache/arrow-rs#10030 fixes it and ships in arrow 59.0.0, after which this 
reader can be dropped. `scan.rs` drops the per-batch deep copy.
   - `reconcileStreamSchema` advertises the truthful first-batch Arrow schema 
(not the consumer's declared types) so native `ScanExec`'s boundary cast fires; 
logs one deduped warning per type drift (e.g. `width_bucket` return-type drift).
   
   ## How are these changes tested?
   
   - Existing suites exercise the input path end to end (`CometExecSuite`, 
`CometShuffleSuite`, `ParquetReadSuite`, the fuzz suites).
   - New `CometArrowStreamSuite` covering stream export and schema 
reconciliation, added to the Linux and macOS PR build workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to