schenksj opened a new pull request, #3932:
URL: https://github.com/apache/datafusion-comet/pull/3932
## Summary
Adds native Delta Lake read support to Comet using `delta-kernel-rs` for log
replay, matching all optimizations in the existing Iceberg native scan path.
Delta tables (`spark.sql("SELECT ... FROM delta.\`/path\`")`) now execute
through `CometDeltaNativeScanExec` → protobuf `DeltaScan` → Rust planner →
Comet's tuned `ParquetSource`, preserving every Comet Parquet-read optimization
(parallel I/O, range merging, page-index filtering, schema adapter for Spark
semantics).
## Design
### Architecture
```
Driver (Scala) Executor (Rust)
───────────────── ─────────────────
CometScanRule OpStruct::DeltaScan match arm
└─ detect DeltaParquetFileFormat └─ deserialize DeltaScanCommon
└─ stripDeltaDvWrappers └─ apply column mapping to
data_schema
└─ nativeDeltaScan validation └─ rewrite filters
(ColumnMappingFilterRewriter)
└─ build PartitionedFiles from
tasks
CometExecRule └─ split by DV presence
└─ CometDeltaNativeScan.convert() └─ init_datasource_exec
(ParquetSource)
└─ JNI: Native.planDeltaScan() └─ wrap with DeltaDvFilterExec (if
DVs)
└─ delta-kernel-rs log replay
└─ DV materialization
└─ column mapping extraction
└─ partition pruning (static)
└─ serialize DeltaScanCommon proto
CometDeltaNativeScanExec
└─ doPrepare() (DPP subqueries)
└─ serializedPartitionData (lazy)
└─ apply DPP filters
└─ per-file split-mode serialization
└─ DeltaPlanDataInjector (LRU-cached)
```
### Key Design Decisions
1. **Kernel on driver, ParquetSource on executors** — `delta-kernel-rs`
handles log replay + file enumeration once on the driver via JNI. Data reads go
through Comet's existing `ParquetSource` (not kernel's `ArrowReader`),
inheriting all Comet optimizations.
2. **Arrow version isolation** — kernel pins arrow-57 / object_store-0.12
internally; Comet uses arrow-58 / object_store-0.13. Only plain Rust types
(`String`, `HashMap`, `Vec<u64>`) cross the boundary. Both arrow versions
coexist in the dep tree without conflict.
3. **Detection by class name** — `DeltaReflection` uses string-based class
name matching (no compile-time dep on `spark-delta`), same pattern as Iceberg's
`SparkBatchQueryScan` detection.
4. **DV handling via plan-tree rewrite** — Delta's `PreprocessTableWithDVs`
Catalyst strategy injects synthetic `__delta_internal_is_row_deleted` columns.
`stripDeltaDvWrappers` undoes this at scan-rule time, and
`CometDeltaDvConfigRule` disables the incompatible `useMetadataRowIndex`
strategy automatically.
## Capabilities
### Phases Implemented
| Phase | Feature | Status |
|-------|---------|--------|
| 0 | Dependency spike (delta_kernel + object_store + roaring) | ✅ |
| 1 | Read-only happy path (unpartitioned, no DV, no column mapping) | ✅ |
| 2 | Predicate pushdown (Catalyst → kernel predicate translation,
stats-based file pruning) | ✅ |
| 3 | Deletion vectors (inline + on-disk, materialized on driver, applied
via DeltaDvFilterExec) | ✅ |
| 4 | Column mapping (mode=id and mode=name, schema evolution with missing
columns) | ✅ |
| 5 | Split-mode serialization, per-file parallelism, partition pruning | ✅ |
| 5b | Dynamic Partition Pruning (DPP via doPrepare + deferred task
filtering) | ✅ |
| 6 | Reader-feature gate (unsupported features → tagged fallback, not
silent wrong results) | ✅ |
### Supported Delta Features
- Partitioned and unpartitioned tables
- Schema evolution (mergeSchema, missing columns → null)
- Time travel (VERSION AS OF, TIMESTAMP AS OF)
- Column mapping modes: none, id, name (including rename after write)
- Deletion vectors (inline bitmaps + on-disk UUID files)
- Stats-based file pruning via kernel predicates
- Data filter pushdown into ParquetSource
- Dynamic partition pruning through joins
- Multi-column partitioning with typed columns (int, long, date, string,
etc.)
- Complex types (array, map, struct, deeply nested)
- Cloud storage (S3/S3A, Azure ABFSS, GCS, local filesystem)
- Protocol feature gating (rowTracking, typeWidening → graceful fallback)
## Iceberg Parity
Every optimization in Comet's Iceberg path has a Delta equivalent:
| # | Feature | Iceberg | Delta |
|---|---------|---------|-------|
| 1 | Split-mode serialization | ✅ lazy val + IcebergPlanDataInjector | ✅
lazy val + DeltaPlanDataInjector |
| 2 | DPP support | ✅ doPrepare() + SubqueryAdaptiveBroadcastExec | ✅
doPrepare() + applyDppFilters() |
| 3 | LRU cache in PlanDataInjector | ✅ 16-entry synchronized LinkedHashMap
| ✅ identical pattern |
| 4 | ImmutableSQLMetric | ✅ prevents accumulator merge overwrites | ✅
identical pattern |
| 5 | Planning metrics | ✅ Iceberg V2 custom metrics | ✅ total_files,
dv_files |
| 6 | Runtime metrics | ✅ output_rows, num_splits | ✅ output_rows,
num_splits |
| 7 | doExecuteColumnar() | ✅ explicit CometExecRDD | ✅ identical pattern |
| 8 | convertBlock() | ✅ preserves @transient fields | ✅ identical pattern |
| 9 | Filesystem scheme validation | ✅ 9 schemes | ✅ same 9 schemes |
| 10 | Schema adapter | ✅ SparkPhysicalExprAdapterFactory | ✅ same adapter |
| 11 | Delete handling | ✅ iceberg-rust ArrowReader MOR | ✅
DeltaDvFilterExec per-batch masking |
| 12 | Config gating | ✅ COMET_ICEBERG_NATIVE_ENABLED | ✅
COMET_DELTA_NATIVE_ENABLED |
| 13 | Feature fallback | ✅ format version check | ✅ kernel
unsupported_features gate |
| 14 | Cloud credentials | ✅ Hadoop→Iceberg key mapping | ✅ Hadoop→kernel
dual-key lookup |
### Intentional Differences (by design, not gaps)
- **Rust execution**: Iceberg uses dedicated `IcebergScanExec` with
iceberg-rust `ArrowReader`; Delta reuses `init_datasource_exec` → Comet's
`ParquetSource` (gets parallel I/O and range merging for free)
- **Proto dedup pools**: Iceberg has 8 deduplication pools for repeated
schemas/partitions; Delta tasks are simpler and don't need pools
- **Scan rule validation depth**: Iceberg validates 11+ conditions via
reflection; Delta delegates most to kernel's built-in validation
## New Files
| File | Purpose |
|------|---------|
| `native/core/src/delta/mod.rs` | Module root, quarantine documentation |
| `native/core/src/delta/scan.rs` | `plan_delta_scan_with_predicate()` —
kernel log replay |
| `native/core/src/delta/engine.rs` | `DeltaStorageConfig` +
`create_engine()` (S3/Azure/local) |
| `native/core/src/delta/jni.rs` |
`Java_org_apache_comet_Native_planDeltaScan` JNI entry |
| `native/core/src/delta/predicate.rs` | Catalyst → kernel predicate
translator |
| `native/core/src/delta/error.rs` | `DeltaError` enum |
| `native/core/src/execution/operators/delta_dv_filter.rs` |
`DeltaDvFilterExec` — per-batch DV row masking |
| `spark/.../CometDeltaNativeScanExec.scala` | Split-mode exec with DPP,
metrics, lazy serialization |
| `spark/.../CometDeltaNativeScan.scala` | Serde: JNI call, partition
pruning, proto construction |
| `spark/.../DeltaReflection.scala` | Class-name detection, table
root/version extraction |
| `spark/.../CometDeltaDvConfigRule` | Auto-configures
useMetadataRowIndex=false |
## Configuration
| Config | Default | Description |
|--------|---------|-------------|
| `spark.comet.scan.deltaNative.enabled` | `false` | Enable native Delta
scan |
| `spark.comet.scan.deltaNative.dataFileConcurrencyLimit` | `1` | Concurrent
file reads per task (2-8 suggested) |
| `spark.comet.scan.deltaNative.fallbackOnUnsupportedFeature` | `true` |
Fallback to Spark on unsupported reader features |
## Test Plan
- [x] **CometDeltaNativeSuite** (26 tests) — core reads, projections,
filters, partitioning, schema evolution, time travel, complex types, primitive
coverage
- [x] **CometDeltaColumnMappingSuite** (5 tests) — column mapping (name/id),
deletion vectors, DV + column mapping, column mapping + schema evolution
- [x] **CometDeltaAdvancedSuite** (11 tests) — joins, aggregations, unions,
window functions, DPP, DPP file pruning, planning metrics, filesystem scheme
validation
- [x] **CometFuzzDeltaSuite** — property-based testing with random schemas
- [x] **DeltaReadFromS3Suite** — MinIO-based S3 integration tests
- [x] All 82 tests passing (Spark 3.5)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]