toutane commented on code in PR #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3419275995
##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
async fn scan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- // Load fresh table metadata from catalog
+ // Second load: fetch the latest snapshot so scans always reflect
current table state.
let table = self
.catalog
.load_table(&self.table_ident)
.await
.map_err(to_datafusion_error)?;
- // Create scan with fresh metadata (always use current snapshot)
- Ok(Arc::new(IcebergTableScan::new(
+ // Build a TableScan mirroring the inputs we'll hand to
IcebergTableScan,
+ // so plan_files() uses the same projection/filters the scan will
replay in execute().
+ let col_names = projection.map(|indices| {
+ indices
+ .iter()
+ .map(|&i| self.schema.field(i).name().clone())
+ .collect::<Vec<_>>()
+ });
+
+ let predicate = convert_filters_to_predicate(filters);
Review Comment:
I updated this so the eager path resolves the scan inputs once.
The catalog-backed planning path now builds a `TableScanConfig` once,
including the `Option<Predicate>`, and reuses that same config both to build
the Iceberg `TableScan` for `plan_files()` and to construct the final scan node
with tasks. That avoids re-running `convert_filters_to_predicate(filters)` for
the eager path.
The lazy path still converts once when building the scan node, and
`execute(partition)` uses the stored predicate directly.
##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
async fn scan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- // Load fresh table metadata from catalog
+ // Second load: fetch the latest snapshot so scans always reflect
current table state.
let table = self
.catalog
.load_table(&self.table_ident)
.await
.map_err(to_datafusion_error)?;
- // Create scan with fresh metadata (always use current snapshot)
- Ok(Arc::new(IcebergTableScan::new(
+ // Build a TableScan mirroring the inputs we'll hand to
IcebergTableScan,
+ // so plan_files() uses the same projection/filters the scan will
replay in execute().
+ let col_names = projection.map(|indices| {
+ indices
+ .iter()
+ .map(|&i| self.schema.field(i).name().clone())
+ .collect::<Vec<_>>()
+ });
+
+ let predicate = convert_filters_to_predicate(filters);
+
+ let mut builder = table.scan();
Review Comment:
Agreed. I refactored this so the planning and execute paths share the same
`IcebergTableScanBuilder` logic.
The builder now centralizes snapshot handling, projection-to-column-name
mapping, and predicate conversion via `build_table_scan()`, and the
catalog-backed eager planning path uses that helper for `plan_files()` before
reusing the same builder to construct the final `IcebergTableScan`.
I also added a regression test for an eager catalog-backed scan with both
projection and filter, so this path is covered against future drift.
##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
async fn scan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- // Load fresh table metadata from catalog
+ // Second load: fetch the latest snapshot so scans always reflect
current table state.
let table = self
.catalog
.load_table(&self.table_ident)
.await
.map_err(to_datafusion_error)?;
- // Create scan with fresh metadata (always use current snapshot)
- Ok(Arc::new(IcebergTableScan::new(
+ // Build a TableScan mirroring the inputs we'll hand to
IcebergTableScan,
+ // so plan_files() uses the same projection/filters the scan will
replay in execute().
+ let col_names = projection.map(|indices| {
+ indices
+ .iter()
+ .map(|&i| self.schema.field(i).name().clone())
+ .collect::<Vec<_>>()
+ });
+
+ let predicate = convert_filters_to_predicate(filters);
+
+ let mut builder = table.scan();
+ builder = match col_names {
+ Some(names) => builder.select(names),
+ None => builder.select_all(),
+ };
+ if let Some(pred) = predicate {
+ builder = builder.with_filter(pred);
+ }
+
+ let tasks: Vec<FileScanTask> = builder
+ .build()
+ .map_err(to_datafusion_error)?
+ .plan_files()
+ .await
+ .map_err(to_datafusion_error)?
+ .try_collect::<Vec<_>>()
+ .await
+ .map_err(to_datafusion_error)?;
+
+ // Output schema after projection: column indices in `Hash` exprs and
any
+ // Arrow array we hash must reference this schema, not the full table
schema.
+ let output_schema = match projection {
+ None => self.schema.clone(),
+ Some(p) => Arc::new(self.schema.project(p).map_err(|e| {
+ to_datafusion_error(Error::new(ErrorKind::DataInvalid,
e.to_string()))
+ })?),
+ };
+
+ let target_partitions = state.config().target_partitions();
+ // Always produce at least 1 partition so that DataFusion can schedule
+ // the plan normally and callers can safely call execute(0). An empty
+ // bucket simply yields an empty record-batch stream.
+ let n_partitions = target_partitions.min(tasks.len()).max(1);
+
+ // identity_cols is Some(non-empty) iff every condition for declaring
+ // Partitioning::Hash is met: the table's default spec has
identity-transform
+ // fields, every such source column is present in the output
projection, and
+ // every column type is supported by literal_to_array. Any miss
collapses to
+ // None, which forces UnknownPartitioning regardless of bucketing
strategy.
+ let identity_cols = bucketing::compute_identity_cols(&table,
&output_schema);
+
+ let (buckets, all_had_full_key) =
+ bucketing::bucket_tasks(tasks, n_partitions,
identity_cols.as_deref());
+
+ let partitioning = match identity_cols {
+ Some(cols) if !cols.is_empty() && all_had_full_key && n_partitions
> 0 => {
Review Comment:
Done, added the short-circuit to `UnknownPartitioning` if `task_count == 0`
and removed the useless condition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]