toutane opened a new pull request, #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298

   ## Which issue does this PR close?
   
   - Closes: #2220
   - Related:
       - #1604 (EPIC: sub-file parallelism, long-term direction)
       - #128 (size-based planning)
   
   ## What changes are included in this PR?
   
   ### Approach
   
   Rather than introducing new types (`IcebergPartitionedScan`, 
`IcebergPartitionedTableProvider` as originally proposed), this PR extends the 
existing `IcebergTableProvider` / `IcebergTableScan` with an eager mode where 
file scan tasks are planned at `scan()` time and distributed into buckets, one 
bucket per DataFusion partition.
   
   The main motivation is to let DataFusion schedule file reads concurrently. 
Previously all files streamed through a single partition 
(`UnknownPartitioning(1)`); now `IcebergTableProvider::scan` distributes tasks 
across `min(target_partitions, n_files)` partitions, and declares 
`Partitioning::Hash` when the data is identity-partitioned.
   
   ### Key changes
   
   - **`TableScan::to_arrow_from_tasks`** - New public method on `TableScan` 
that accepts a pre-collected `FileScanTaskStream` instead of calling 
`plan_files()` internally. This is the hook used by 
`IcebergTableScan::execute(i)` to replay each bucket through the Arrow reader 
while preserving all reader-side configuration (concurrency limit, row-group 
filtering, batch size). Tasks must come from a `TableScan` with the same 
projection and filters as `self` - predicates are baked into each task at 
planning time and are not re-applied by the reader. The doc comment makes this 
contract explicit.
       
   - **`IcebergTableScan` is now `pub`** - Previously `pub(crate)`. Made public 
so that downstream integrations that need to inspect or wrap the physical plan 
can do so without going through the table provider.
       
   - **`with_new_children` now returns an error** - `IcebergTableScan` is a 
leaf node and does not support children. Previously the implementation silently 
dropped any children passed to it; it now returns `DataFusionError::Internal` 
when `children` is non-empty, matching the contract of `IcebergCommitExec`.
       
   - **Eager task planning in `IcebergTableProvider::scan`** - `plan_files()` 
is now called at planning time (inside `TableProvider::scan`) rather than at 
execution time (inside `ExecutionPlan::execute`). The collected tasks are 
distributed into `min(target_partitions, n_files)` buckets by 
`bucketing::bucket_tasks` and stored in the scan. Each `execute(i)` call then 
fetches its pre-assigned bucket and streams it through `to_arrow_from_tasks` - 
no redundant metadata reads per partition.
       
   - **`bucketing` module** - Handles bucket assignment and `Partitioning` 
declaration. For tables with a single partition spec using only identity 
transforms, tasks are hashed on their partition values using DataFusion's 
`create_hashes` + `REPARTITION_RANDOM_STATE`, and the scan declares 
`Partitioning::Hash`. This lets DataFusion recognize that the output is already 
hash-partitioned and skip a downstream `RepartitionExec`. Non-identity 
transforms (`bucket`, `truncate`, `year`/`month`/`day`/`hour`) are lossy: the 
partition value in task metadata does not match what DataFusion would compute 
by hashing the actual column values, so those cases fall back to 
`UnknownPartitioning`. Any task that cannot be fully hashed with the identity 
key (unsupported literal type, null partition value) also falls back.  
       _Credit: This bucketing solution was proposed by @timsaucer._
       
   
   ### Design choices - planning at `scan()` time vs. at `execute()` time
   
   Planning eagerly at `scan()` time is a deliberate trade-off:
   
   - **Pro:** Tasks are computed once and shared across all partitions; the 
plan is reproducible; `execute(i)` is pure I/O with no catalog round-trips.
   - **Con:** `TableProvider::scan` now does network I/O (catalog + metadata 
reads), which is unusual for a planning-phase method. An alternative design - 
planning lazily at execute time - would keep `scan()` cheap but requires one 
`plan_files()` call per partition (redundant). A future extension could expose 
this as an option for use cases where snapshot staleness matters more than plan 
reproducibility.
   
   ### Known limitations
   
   - **Limited type support for `Partitioning::Hash`** - `literal_to_array` 
supports seven primitive Arrow types (`Bool`, `Int32`, `Int64`, `Float32`, 
`Float64`, `Utf8`, `Date32`). Timestamps, `Decimal128`, `LargeUtf8`, etc. are 
not yet covered; any unsupported type forces fallback to `UnknownPartitioning`.
       
   - **Spec evolution disables `Partitioning::Hash`** - If the table has more 
than one historical partition spec, the bucketing module conservatively returns 
`UnknownPartitioning` to avoid mismatches between old and new partition tuple 
layouts.
       
   
   ## Are these changes tested?
   
   **Unit tests** in `table/mod.rs` covering the new bucketed scan path:
   
   - `test_empty_table_single_empty_bucket` - Empty table produces one empty 
bucket, guarding against out-of-bounds panic on `execute(0)`.
   - `test_unpartitioned_falls_back_to_unknown` - Unpartitioned table declares 
`UnknownPartitioning`.
   - `test_bucket_count_capped_at_file_count` - When `target_partitions > 
n_files`, bucket count is capped at `n_files`.
   - `test_single_target_partition_single_bucket` - `target_partitions=1` 
produces a single bucket regardless of file count, reproducing the original 
single-threaded behavior.
   - `test_identity_partitioned_declares_hash` - Identity-partitioned table 
declares `Partitioning::Hash` referencing the partition column.
   - `test_projection_without_partition_col_falls_back_to_unknown` - Projecting 
away an identity column falls back to `UnknownPartitioning`.
   
   Additional tests are added for `IcebergTableProvider` to cover limit 
pushdown, insert behavior, and schema consistency, ensuring the refactor 
introduces no regressions on existing functionality.
   
   **SQL logic tests** - `EXPLAIN` snapshots are updated to reflect the new 
`buckets:[N] file_count:[M]` display format and the correct `input_partitions` 
counts.
   
   **Production validation** - We plan to test these changes in our 
infrastructure by shadowing real-world queries.
   
   ## Follow-up work
   
   - **Redundant `FilterExec`** - @timsaucer reports that 
`supports_filters_pushdown` returns `Inexact` for all filters, causing 
DataFusion to insert a `FilterExec` above `IcebergTableScan` even though the 
Arrow reader already applies the predicate via `ArrowPredicateFn`. Returning 
`Exact` for losslessly-converted filters would eliminate this redundant 
re-evaluation.  
       He proposed a solution in earlier commits, but those have been reverted 
as they are out of scope for this PR. This issue is tracked in #2363.
   
   ## Note
   
   `IcebergStaticTableProvider` is unchanged - it still uses 
`IcebergTableScan::new` (lazy, single-partition). Static snapshots do not 
benefit from eager planning because the task list is fixed by construction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to