andygrove opened a new pull request, #4569:
URL: https://github.com/apache/datafusion-comet/pull/4569

   ## Which issue does this PR close?
   
   Closes #.
   
   <!-- No tracking issue yet. Opening as a draft to gather feedback on the 
design; happy to file a tracking issue if there is interest. -->
   
   ## Rationale for this change
   
   When a DataFrame or table is cached (`df.cache()` / `CACHE TABLE`), Spark's 
`DefaultCachedBatchSerializer` stores each column in Spark's compressed 
columnar format. Comet does not treat `InMemoryTableScanExec` as native, so it 
inserts a `CometSparkToColumnarExec` above it and pays a JVM-to-Arrow 
conversion on every read of the cached data:
   
   ```
   cached (compressed) -> decompress to Spark ColumnarBatch -> convert to Arrow 
-> native
   ```
   
   That conversion runs on every scan, which undercuts the benefit of caching 
for native pipelines. This PR lets Comet store the cache as compressed Arrow 
IPC once, at cache-build time, so repeated scans feed native execution directly 
with no per-read conversion. This is the same approach used by other columnar 
Spark accelerators.
   
   ## What changes are included in this PR?
   
   A new `CometCachedBatchSerializer` (plugged into Spark's 
`spark.sql.cache.serializer`) that:
   
   - Encodes each cached batch to compressed Arrow IPC (reusing Comet's 
existing `serializeBatches`/`decodeBatches`), storing the bytes plus a 
Spark-format per-column stats row.
   - Extends Spark's `SimpleMetricsCachedBatchSerializer`, so batch-level 
partition pruning (`buildFilter`) works using the computed min/max/null/count 
stats.
   - Decodes back to `CometVector`-backed `ColumnarBatch` on read, with column 
pruning and an `InternalRow` fallback for non-Comet consumers.
   - Delegates transparently to Spark's `DefaultCachedBatchSerializer` for 
schemas it does not support (nested/complex types), so it is a safe drop-in.
   
   Supporting changes:
   
   - `CometSparkToColumnarExec` gains a passthrough fast-path: batches whose 
columns are already `CometVector` are forwarded without a re-copy (with a 
`numPassthroughBatches` metric).
   - `CometDriverPlugin` installs the serializer at startup when the new 
`spark.comet.cache.serializer.enabled` config (default off) is set, respecting 
any user-provided serializer. The Spark property is a static config, so it must 
be set before the session is created.
   - New config `spark.comet.cache.serializer.enabled` (default off).
   
   Supported flat types: boolean, integral, floating point, decimal, string, 
binary, date, timestamp, timestamp_ntz. Nested types delegate. Off by default.
   
   ## How are these changes tested?
   
   New tests:
   
   - `CometCachedBatchSerializerSuite`: stats-row layout; build path 
(compressed IPC + stats); decode round-trip; column pruning; the columnar read 
path (identity and pruned projections); the `CometSparkToColumnarExec` 
passthrough metric; a regression test that string min/max stats survive 
encoding (they are copied off the Arrow buffer); and end-to-end tests for 
cached-vs-uncached correctness, filtered pruning on numeric and string columns, 
`MEMORY_AND_DISK` spill, array-type delegation, and `timestamp_ntz` value 
round-tripping.
   - `CometPluginsSuite`: the driver plugin installs the serializer only when 
enabled and never overrides a user-provided non-default serializer.
   
   Verified compiling and passing on Spark 3.4, 3.5, and 4.x profiles.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to