toutane commented on code in PR #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3419273013
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -31,46 +30,113 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan,
Partitioning, PlanProp
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
+use iceberg::scan::{FileScanTask, TableScan};
use iceberg::table::Table;
use super::expr_to_predicate::convert_filters_to_predicate;
use crate::to_datafusion_error;
-/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
-/// necessary details and computed properties required for execution planning.
+/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
+///
+/// Has two construction modes: [`IcebergTableScan::new`] for a lazy
+/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an
+/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets.
+///
+/// Note: in eager mode the underlying `TableScan` is rebuilt on every
+/// `execute(partition)` call. The per-build cost is bounded (no I/O) and
+/// keeps the plan free of `Arc`-shared evaluator caches that are awkward to
+/// serialize across workers.
#[derive(Debug)]
pub struct IcebergTableScan {
/// A table in the catalog.
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
- /// Stores certain, often expensive to compute,
- /// plan properties used in query optimization.
+ /// Cached plan properties used by query optimization.
plan_properties: Arc<PlanProperties>,
- /// Projection column names, None means all columns
+ /// Projection column names, None means all columns.
projection: Option<Vec<String>>,
- /// Filters to apply to the table scan
+ /// Filters to apply to the table scan.
predicates: Option<Predicate>,
- /// Optional limit on the number of rows to return
+ /// Pre-planned file scan tasks per partition (eager mode), or `None`
(lazy mode).
+ buckets: Option<Vec<Vec<FileScanTask>>>,
Review Comment:
That is indeed misleading.
I think keeping `Option` is better, as it lets us distinguish the case where
we've already planned but got `0` tasks from the case where we haven't planned
yet. I changed the accessor accordingly, so it now returns an `Option`.
I also added a comment above the struct field to make explicit that `None`
means lazy mode and `Some` means eager mode.
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -31,46 +30,113 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan,
Partitioning, PlanProp
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
+use iceberg::scan::{FileScanTask, TableScan};
use iceberg::table::Table;
use super::expr_to_predicate::convert_filters_to_predicate;
use crate::to_datafusion_error;
-/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
-/// necessary details and computed properties required for execution planning.
+/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
+///
+/// Has two construction modes: [`IcebergTableScan::new`] for a lazy
+/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an
+/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets.
+///
+/// Note: in eager mode the underlying `TableScan` is rebuilt on every
+/// `execute(partition)` call. The per-build cost is bounded (no I/O) and
+/// keeps the plan free of `Arc`-shared evaluator caches that are awkward to
+/// serialize across workers.
#[derive(Debug)]
pub struct IcebergTableScan {
/// A table in the catalog.
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
- /// Stores certain, often expensive to compute,
- /// plan properties used in query optimization.
+ /// Cached plan properties used by query optimization.
plan_properties: Arc<PlanProperties>,
- /// Projection column names, None means all columns
+ /// Projection column names, None means all columns.
projection: Option<Vec<String>>,
- /// Filters to apply to the table scan
+ /// Filters to apply to the table scan.
predicates: Option<Predicate>,
- /// Optional limit on the number of rows to return
+ /// Pre-planned file scan tasks per partition (eager mode), or `None`
(lazy mode).
+ buckets: Option<Vec<Vec<FileScanTask>>>,
+ /// Optional limit on the number of rows to return.
limit: Option<usize>,
}
impl IcebergTableScan {
- /// Creates a new [`IcebergTableScan`] object.
- pub(crate) fn new(
+ /// Creates a lazy single-partition scan that plans and reads all tasks
+ /// inside `execute(0)`. Used by
+ ///
[`IcebergStaticTableProvider`][crate::table::IcebergStaticTableProvider].
+ pub fn new(
+ table: Table,
+ snapshot_id: Option<i64>,
+ schema: ArrowSchemaRef,
+ projection: Option<&Vec<usize>>,
+ filters: &[Expr],
+ limit: Option<usize>,
+ ) -> Self {
+ Self::new_inner(
+ table,
+ snapshot_id,
+ schema,
+ projection,
+ filters,
+ limit,
+ Partitioning::UnknownPartitioning(1),
+ None,
+ )
+ }
+
+ /// Creates an eager multi-partition scan over pre-planned task buckets.
+ /// Partition `i` streams `buckets[i]`. The caller is responsible for
+ /// ensuring `partitioning` matches the bucketing. Used by
+ /// [`IcebergTableProvider`][crate::table::IcebergTableProvider].
+ #[allow(clippy::too_many_arguments)]
Review Comment:
You're right, this could let a user build an inconsistent node.
I added a `pub struct IcebergTableScanBuilder` that throws an error if
`buckets.len() != partition_count`. This removes the duplication between `new`,
`new_with_tasks`, and `new_inner`.
I'm interested in keeping the ability to build an `IcebergTableScan` from
outside, which is why `new` and `new_with_tasks` were previously `pub`. We
currently rely on a catalog and schema provider other than the ones in
iceberg-datafusion. This is also something other users have asked for, as
mentioned in this proposal: https://github.com/apache/iceberg-rust/pull/2123.
For that reason, I've kept `build` as `pub`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]