mbutrovich commented on code in PR #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3404452333


##########
crates/iceberg/src/scan/mod.rs:
##########
@@ -433,6 +433,21 @@ impl TableScan {
 
     /// Returns an [`ArrowRecordBatchStream`].
     pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
+        self.to_arrow_from_tasks(self.plan_files().await?)
+    }
+
+    /// Like [`TableScan::to_arrow`], but accepts a caller-supplied
+    /// [`FileScanTask`] stream instead of running [`TableScan::plan_files`]
+    /// internally.
+    ///
+    /// # Correctness
+    ///
+    /// Tasks must come from a [`TableScan`] with the same projection and
+    /// filter as `self`: predicates are baked into each task at planning
+    /// time and are not re-applied here. Reader-side configuration
+    /// (concurrency, batch size, row-group filtering, row selection) is
+    /// taken from `self` and may differ from the planning scan.
+    pub fn to_arrow_from_tasks(&self, tasks: FileScanTaskStream) -> 
Result<ArrowRecordBatchStream> {

Review Comment:
   This is now public on the core `TableScan`, and the only thing keeping it 
correct is the doc-comment contract that the caller's tasks were planned with 
the same projection/filter as `self`. Since the predicate is baked into the 
tasks and isn't re-derived here, passing tasks from a differently-filtered scan 
would silently return wrong rows with no error. Could we keep this `pub(crate)` 
(or behind the integration) until there's a concrete external need? If it does 
need to be public, is there a way to make the type system carry the invariant 
(*e.g.*, consuming a wrapper that proves the tasks came from this scan) rather 
than relying on the doc?
   
   Separately: as written, the body just copies five reader-config fields off 
`self` (`file_io`, `concurrency_limit_data_files`, 
`row_group_filtering_enabled`, `row_selection_enabled`, `batch_size`) onto an 
`ArrowReaderBuilder` and calls `read(tasks)`; nothing in it needs the `Table`. 
And `ArrowReaderBuilder` is already public and `Table`-free, so the "execute 
pre-planned tasks" capability already exists for external callers: 
datafusion-comet's `IcebergScanExec`, which holds only a `metadata_location` 
and JVM-planned tasks (no `Table`), builds its reader that way today. So this 
method doesn't unlock anything new for downstream engines; its only value is 
reusing a `TableScan`'s already-configured reader settings. Given that, does it 
earn `pub` on the core `TableScan`, or is `pub(crate)` enough? Callers without 
a `TableScan` are better served by `ArrowReaderBuilder` directly.



##########
crates/integrations/datafusion/src/table/bucketing.rs:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int32Array, Int64Array,
+    StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::hash_utils::create_hashes;
+use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
+use iceberg::scan::FileScanTask;
+use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
+use iceberg::table::Table;
+
+/// Identity-partitioned column that is also present in the output projection
+/// and whose Arrow type can be reconstructed from a `Literal` for hashing.
+pub(super) struct IdentityCol {
+    pub(super) name: String,
+    /// Position of this column in the *output* schema (after projection).
+    pub(super) output_idx: usize,
+    /// Position of this column inside the partition spec's `fields()` slice,
+    /// matching the slot order of `FileScanTask::partition`.
+    pub(super) spec_field_idx: usize,
+    pub(super) output_dtype: DataType,
+}
+
+/// Inspect the table's default partition spec and return the list of identity
+/// columns that can support a [`Partitioning::Hash`] declaration. Returns
+/// `None` if any condition is violated:
+///   - the source column for an identity field is not in the output projection
+///   - the source column's Arrow type is not currently supported by
+///     [`literal_to_array`]
+///   - the table has spec evolution (>1 historical specs), since older files
+///     may carry a partition tuple that does not align with the default spec
+///
+/// Returning `None` forces the scan to declare `UnknownPartitioning` even if
+/// bucketing succeeds.
+pub(super) fn compute_identity_cols(
+    table: &Table,
+    output_schema: &ArrowSchema,
+) -> Option<Vec<IdentityCol>> {
+    let metadata = table.metadata();
+    if metadata.partition_specs_iter().len() > 1 {
+        return None;
+    }
+    let spec = metadata.default_partition_spec();
+    let table_schema = metadata.current_schema();
+
+    let mut cols = Vec::new();
+    for (spec_field_idx, pf) in spec.fields().iter().enumerate() {
+        if pf.transform != Transform::Identity {
+            continue;
+        }
+        let source_field = table_schema.field_by_id(pf.source_id)?;
+        let output_idx = 
output_schema.index_of(source_field.name.as_str()).ok()?;
+        let output_dtype = output_schema.field(output_idx).data_type().clone();
+        if !is_supported_dtype(&output_dtype) {
+            return None;
+        }
+        cols.push(IdentityCol {
+            name: source_field.name.clone(),
+            output_idx,
+            spec_field_idx,
+            output_dtype,
+        });
+    }
+    Some(cols)
+}
+
+fn is_supported_dtype(dt: &DataType) -> bool {
+    matches!(
+        dt,
+        DataType::Boolean
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+            | DataType::Utf8
+            | DataType::Date32
+    )
+}
+
+/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols`
+/// describes a non-empty, hashable identity key, each task is hashed on
+/// that key using DataFusion's repartition hash so the resulting partitioning
+/// matches what `RepartitionExec` would produce on the same data. Tasks
+/// missing partition data fall back to hashing `data_file_path`, which still
+/// distributes evenly but breaks the `Hash` contract — the second tuple
+/// element flags whether every task supplied a full identity key.
+pub(super) fn bucket_tasks(

Review Comment:
   Tasks are distributed purely by `hash % n_partitions`, with no awareness of 
`file_size_in_bytes`. A table with one large file plus many small ones would 
put very uneven work in one bucket and serialize the query on it. iceberg-java 
does size-based bin-packing here (`TableScanUtil.planTaskGroups` + 
`BinPacking`, weighted by file/delete size with a target split size), which is 
also what #128 is asking for. I'm fine with count-based as a first cut, but 
could we at least note the limitation in the module doc and file/track the 
size-based follow-up? The size field is already on `FileScanTask`, so 
first-fit-decreasing on bytes would be a fairly contained extension.



##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
         limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        // Load fresh table metadata from catalog
+        // Second load: fetch the latest snapshot so scans always reflect 
current table state.
         let table = self
             .catalog
             .load_table(&self.table_ident)
             .await
             .map_err(to_datafusion_error)?;
 
-        // Create scan with fresh metadata (always use current snapshot)
-        Ok(Arc::new(IcebergTableScan::new(
+        // Build a TableScan mirroring the inputs we'll hand to 
IcebergTableScan,
+        // so plan_files() uses the same projection/filters the scan will 
replay in execute().
+        let col_names = projection.map(|indices| {
+            indices
+                .iter()
+                .map(|&i| self.schema.field(i).name().clone())
+                .collect::<Vec<_>>()
+        });
+
+        let predicate = convert_filters_to_predicate(filters);
+
+        let mut builder = table.scan();

Review Comment:
   This `select`/`select_all` + `with_filter` + `build` sequence (and the 
`col_names` projection->names map just above) looks like it re-implements 
`build_table_scan` and `get_column_names` in `physical_plan/scan.rs`. Could 
these share one helper so the planning path and the execute path can't drift 
apart on, say, snapshot handling?



##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
         limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        // Load fresh table metadata from catalog
+        // Second load: fetch the latest snapshot so scans always reflect 
current table state.
         let table = self
             .catalog
             .load_table(&self.table_ident)
             .await
             .map_err(to_datafusion_error)?;
 
-        // Create scan with fresh metadata (always use current snapshot)
-        Ok(Arc::new(IcebergTableScan::new(
+        // Build a TableScan mirroring the inputs we'll hand to 
IcebergTableScan,
+        // so plan_files() uses the same projection/filters the scan will 
replay in execute().
+        let col_names = projection.map(|indices| {
+            indices
+                .iter()
+                .map(|&i| self.schema.field(i).name().clone())
+                .collect::<Vec<_>>()
+        });
+
+        let predicate = convert_filters_to_predicate(filters);
+
+        let mut builder = table.scan();
+        builder = match col_names {
+            Some(names) => builder.select(names),
+            None => builder.select_all(),
+        };
+        if let Some(pred) = predicate {
+            builder = builder.with_filter(pred);
+        }
+
+        let tasks: Vec<FileScanTask> = builder
+            .build()
+            .map_err(to_datafusion_error)?
+            .plan_files()
+            .await
+            .map_err(to_datafusion_error)?
+            .try_collect::<Vec<_>>()
+            .await
+            .map_err(to_datafusion_error)?;
+
+        // Output schema after projection: column indices in `Hash` exprs and 
any
+        // Arrow array we hash must reference this schema, not the full table 
schema.
+        let output_schema = match projection {
+            None => self.schema.clone(),
+            Some(p) => Arc::new(self.schema.project(p).map_err(|e| {
+                to_datafusion_error(Error::new(ErrorKind::DataInvalid, 
e.to_string()))
+            })?),
+        };
+
+        let target_partitions = state.config().target_partitions();
+        // Always produce at least 1 partition so that DataFusion can schedule
+        // the plan normally and callers can safely call execute(0). An empty
+        // bucket simply yields an empty record-batch stream.
+        let n_partitions = target_partitions.min(tasks.len()).max(1);
+
+        // identity_cols is Some(non-empty) iff every condition for declaring
+        // Partitioning::Hash is met: the table's default spec has 
identity-transform
+        // fields, every such source column is present in the output 
projection, and
+        // every column type is supported by literal_to_array. Any miss 
collapses to
+        // None, which forces UnknownPartitioning regardless of bucketing 
strategy.
+        let identity_cols = bucketing::compute_identity_cols(&table, 
&output_schema);
+
+        let (buckets, all_had_full_key) =
+            bucketing::bucket_tasks(tasks, n_partitions, 
identity_cols.as_deref());
+
+        let partitioning = match identity_cols {
+            Some(cols) if !cols.is_empty() && all_had_full_key && n_partitions 
> 0 => {

Review Comment:
   Since `n_partitions = target_partitions.min(tasks.len()).max(1)` a few lines 
up, isn't this always >= 1? If so the `&& n_partitions > 0` can be dropped. 
(And for an empty partitioned table this branch would declare `Hash(exprs, 1)` 
since `all_had_full_key` stays true with no tasks, which is harmless, but a 
`tasks.is_empty()` short-circuit to `Unknown` might read clearer.)



##########
crates/integrations/datafusion/src/table/bucketing.rs:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int32Array, Int64Array,
+    StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::hash_utils::create_hashes;
+use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
+use iceberg::scan::FileScanTask;
+use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
+use iceberg::table::Table;
+
+/// Identity-partitioned column that is also present in the output projection
+/// and whose Arrow type can be reconstructed from a `Literal` for hashing.
+pub(super) struct IdentityCol {
+    pub(super) name: String,
+    /// Position of this column in the *output* schema (after projection).
+    pub(super) output_idx: usize,
+    /// Position of this column inside the partition spec's `fields()` slice,
+    /// matching the slot order of `FileScanTask::partition`.
+    pub(super) spec_field_idx: usize,
+    pub(super) output_dtype: DataType,
+}
+
+/// Inspect the table's default partition spec and return the list of identity
+/// columns that can support a [`Partitioning::Hash`] declaration. Returns
+/// `None` if any condition is violated:
+///   - the source column for an identity field is not in the output projection
+///   - the source column's Arrow type is not currently supported by
+///     [`literal_to_array`]
+///   - the table has spec evolution (>1 historical specs), since older files
+///     may carry a partition tuple that does not align with the default spec
+///
+/// Returning `None` forces the scan to declare `UnknownPartitioning` even if
+/// bucketing succeeds.
+pub(super) fn compute_identity_cols(
+    table: &Table,
+    output_schema: &ArrowSchema,
+) -> Option<Vec<IdentityCol>> {
+    let metadata = table.metadata();
+    if metadata.partition_specs_iter().len() > 1 {

Review Comment:
   Returning `None` whenever there's more than one historical spec is safe but 
stricter than iceberg-java, which intersects the identity fields present across 
all specs (`Partitioning.groupingKeyType` / `commonActiveFieldIds`) and still 
reports a grouping key on the common columns. I see the history tried the 
per-column intersection and reverted it as out of scope, which is totally 
reasonable for this PR. Could we add a one-line comment pointing at that 
behavior so the conservatism reads as intentional, and link a follow-up issue?



##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -187,57 +255,107 @@ impl DisplayAs for IcebergTableScan {
         _t: datafusion::physical_plan::DisplayFormatType,
         f: &mut std::fmt::Formatter,
     ) -> std::fmt::Result {
+        let projection = self
+            .projection
+            .as_deref()
+            .map_or(String::new(), |v| v.join(","));
+        let predicate = self
+            .predicates
+            .as_ref()
+            .map_or(String::new(), |p| p.to_string());
+
         write!(
             f,
-            "IcebergTableScan projection:[{}] predicate:[{}]",
-            self.projection
-                .clone()
-                .map_or(String::new(), |v| v.join(",")),
-            self.predicates
-                .clone()
-                .map_or(String::from(""), |p| format!("{p}"))
+            "{} projection:[{projection}] predicate:[{predicate}]",
+            self.name()
         )?;
+        if let Some(buckets) = &self.buckets {
+            let file_count = self.total_file_count();
+            let bucket_count = buckets.len();
+            write!(f, " buckets:[{bucket_count}] file_count:[{file_count}]")?;
+        }
         if let Some(limit) = self.limit {
             write!(f, " limit:[{limit}]")?;
         }
         Ok(())
     }
 }
 
-/// Asynchronously retrieves a stream of [`RecordBatch`] instances
-/// from a given table.
-///
-/// This function initializes a [`TableScan`], builds it,
-/// and then converts it into a stream of Arrow [`RecordBatch`]es.
-async fn get_batch_stream(
+fn build_table_scan(
     table: Table,
     snapshot_id: Option<i64>,
     column_names: Option<Vec<String>>,
     predicates: Option<Predicate>,
-) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
+) -> DFResult<TableScan> {
     let scan_builder = match snapshot_id {
-        Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
+        Some(id) => table.scan().snapshot_id(id),
         None => table.scan(),
     };
-
     let mut scan_builder = match column_names {
-        Some(column_names) => scan_builder.select(column_names),
+        Some(names) => scan_builder.select(names),
         None => scan_builder.select_all(),
     };
     if let Some(pred) = predicates {
         scan_builder = scan_builder.with_filter(pred);
     }
-    let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
+    scan_builder.build().map_err(to_datafusion_error)
+}
+
+/// Builds the `RecordBatch` stream for a single partition. When `bucket` is
+/// `Some`, streams the pre-planned tasks via `to_arrow_from_tasks`; when
+/// `None`, plans and reads the full scan via `to_arrow`.
+async fn build_record_batch_stream(

Review Comment:
   Both arms repeat `.map_err(to_datafusion_error)? ... 
.map_err(to_datafusion_error)`. Could we produce the inner stream in the 
`match` and apply the error mapping once afterward to cut the duplication? (No 
behavior change intended.)



##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -865,4 +933,382 @@ mod tests {
             "Limit should be None when not specified"
         );
     }
+
+    // ── Bucketed scan tests 
──────────────────────────────────────────────────
+
+    async fn make_catalog_and_table_for_bucketing()
+    -> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
+        use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+        use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+        use iceberg::{CatalogBuilder, TableCreation};
+
+        let temp_dir = tempfile::TempDir::new().unwrap();
+        let warehouse = temp_dir.path().to_str().unwrap().to_string();
+
+        let catalog = Arc::new(
+            MemoryCatalogBuilder::default()
+                .load(
+                    "memory",
+                    std::collections::HashMap::from([(
+                        MEMORY_CATALOG_WAREHOUSE.to_string(),
+                        warehouse.clone(),
+                    )]),
+                )
+                .await
+                .unwrap(),
+        );
+
+        let namespace = NamespaceIdent::new("ns".to_string());
+        catalog
+            .create_namespace(&namespace, std::collections::HashMap::new())
+            .await
+            .unwrap();
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        catalog
+            .create_table(
+                &namespace,
+                TableCreation::builder()
+                    .name("t".to_string())
+                    .location(format!("{warehouse}/t"))
+                    .schema(schema)
+                    .properties(std::collections::HashMap::new())
+                    .build(),
+            )
+            .await
+            .unwrap();
+
+        (catalog, namespace, "t".to_string(), temp_dir)
+    }
+
+    /// Registers `n` synthetic data files in the table metadata via the 
iceberg
+    /// transaction API. No actual parquet files are written, only the metadata
+    /// entries that `plan_files()` reads are created.
+    async fn append_fake_data_files(
+        catalog: &Arc<dyn Catalog>,
+        namespace: &NamespaceIdent,
+        table_name: &str,
+        n: usize,
+    ) {
+        use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat};
+        use iceberg::transaction::{ApplyTransactionAction, Transaction};
+
+        let table = catalog
+            .load_table(&TableIdent::new(namespace.clone(), 
table_name.to_string()))
+            .await
+            .unwrap();
+
+        let data_files = (0..n)
+            .map(|i| {
+                DataFileBuilder::default()
+                    .content(DataContentType::Data)
+                    .file_path(format!(
+                        "{}/data/fake_{i}.parquet",
+                        table.metadata().location()
+                    ))
+                    .file_format(DataFileFormat::Parquet)
+                    .file_size_in_bytes(128)
+                    .record_count(1)
+                    
.partition_spec_id(table.metadata().default_partition_spec_id())
+                    .build()
+                    .unwrap()
+            })
+            .collect::<Vec<_>>();
+
+        let tx = Transaction::new(&table);
+        let action = tx.fast_append().add_data_files(data_files);
+        action
+            .apply(tx)
+            .unwrap()
+            .commit(catalog.as_ref())
+            .await
+            .unwrap();
+    }
+
+    fn ctx_with_target_partitions(n: usize) -> SessionContext {
+        use datafusion::prelude::SessionConfig;
+        
SessionContext::new_with_config(SessionConfig::new().with_target_partitions(n))
+    }
+
+    /// An empty table must produce a single empty-bucket scan so that 
DataFusion
+    /// can schedule the plan normally. execute(0) on an empty bucket simply
+    /// returns an empty record-batch stream.
+    #[tokio::test]
+    async fn test_empty_table_single_empty_bucket() {
+        let (catalog, namespace, table_name, _temp_dir) =
+            make_catalog_and_table_for_bucketing().await;
+        // no files appended
+        let provider = IcebergTableProvider::try_new(catalog, namespace, 
table_name)
+            .await
+            .unwrap();
+        let plan = provider
+            .scan(&ctx_with_target_partitions(8).state(), None, &[], None)
+            .await
+            .unwrap();
+        let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+        assert_eq!(scan.buckets().len(), 1);
+        assert_eq!(scan.buckets()[0].len(), 0);
+        assert_eq!(scan.properties().partitioning.partition_count(), 1);
+    }
+
+    /// When the table has no identity-partition columns, every task takes the
+    /// fallback (file_path) bucket path, so the declaration must drop to
+    /// `UnknownPartitioning`. The bucket count should still equal
+    /// min(target_partitions, num_files).
+    #[tokio::test]
+    async fn test_unpartitioned_falls_back_to_unknown() {
+        use datafusion::physical_plan::Partitioning;
+
+        let (catalog, namespace, table_name, _temp_dir) =
+            make_catalog_and_table_for_bucketing().await;
+        append_fake_data_files(&catalog, &namespace, &table_name, 5).await;
+
+        let provider = IcebergTableProvider::try_new(catalog, namespace, 
table_name)
+            .await
+            .unwrap();
+        let plan = provider
+            .scan(&ctx_with_target_partitions(3).state(), None, &[], None)
+            .await
+            .unwrap();
+        let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+        let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum();
+        assert_eq!(total_files, 5);
+        assert_eq!(scan.buckets().len(), 3);
+        assert!(matches!(
+            scan.properties().partitioning,
+            Partitioning::UnknownPartitioning(3)
+        ));
+    }
+
+    /// Bucket count must be capped at the number of files: spinning up more
+    /// DataFusion partitions than there are tasks would just leave empty
+    /// streams, wasting scheduler slots.
+    #[tokio::test]
+    async fn test_bucket_count_capped_at_file_count() {
+        let (catalog, namespace, table_name, _temp_dir) =
+            make_catalog_and_table_for_bucketing().await;
+        append_fake_data_files(&catalog, &namespace, &table_name, 2).await;
+
+        let provider = IcebergTableProvider::try_new(catalog, namespace, 
table_name)
+            .await
+            .unwrap();
+        let plan = provider
+            .scan(&ctx_with_target_partitions(16).state(), None, &[], None)
+            .await
+            .unwrap();
+        let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+        assert_eq!(scan.buckets().len(), 2);
+    }
+
+    /// target_partitions = 1 collapses every task into a single bucket, giving
+    /// the same execution profile as a single-partition scan.
+    #[tokio::test]
+    async fn test_single_target_partition_single_bucket() {
+        let (catalog, namespace, table_name, _temp_dir) =
+            make_catalog_and_table_for_bucketing().await;
+        append_fake_data_files(&catalog, &namespace, &table_name, 4).await;
+
+        let provider = IcebergTableProvider::try_new(catalog, namespace, 
table_name)
+            .await
+            .unwrap();
+        let plan = provider
+            .scan(&ctx_with_target_partitions(1).state(), None, &[], None)
+            .await
+            .unwrap();
+        let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+        assert_eq!(scan.buckets().len(), 1);
+        assert_eq!(scan.buckets()[0].len(), 4);
+    }
+
+    async fn make_partitioned_catalog_and_table_for_bucketing()
+    -> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
+        use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+        use iceberg::spec::{
+            NestedField, PrimitiveType, Schema, Transform, Type, 
UnboundPartitionSpec,
+        };
+        use iceberg::{CatalogBuilder, TableCreation};
+
+        let temp_dir = tempfile::TempDir::new().unwrap();
+        let warehouse = temp_dir.path().to_str().unwrap().to_string();
+
+        let catalog = Arc::new(
+            MemoryCatalogBuilder::default()
+                .load(
+                    "memory",
+                    std::collections::HashMap::from([(
+                        MEMORY_CATALOG_WAREHOUSE.to_string(),
+                        warehouse.clone(),
+                    )]),
+                )
+                .await
+                .unwrap(),
+        );
+
+        let namespace = NamespaceIdent::new("ns".to_string());
+        catalog
+            .create_namespace(&namespace, std::collections::HashMap::new())
+            .await
+            .unwrap();
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = UnboundPartitionSpec::builder()
+            .with_spec_id(0)
+            .add_partition_field(2, "name_part", Transform::Identity)
+            .unwrap()
+            .build();
+
+        catalog
+            .create_table(
+                &namespace,
+                TableCreation::builder()
+                    .name("t".to_string())
+                    .location(format!("{warehouse}/t"))
+                    .schema(schema)
+                    .partition_spec(partition_spec)
+                    .properties(std::collections::HashMap::new())
+                    .build(),
+            )
+            .await
+            .unwrap();
+
+        (catalog, namespace, "t".to_string(), temp_dir)
+    }
+
+    /// Like [`append_fake_data_files`] but each file carries a partition tuple
+    /// matching the table's identity-partition spec on `name`.
+    async fn append_partitioned_fake_data_files(
+        catalog: &Arc<dyn Catalog>,
+        namespace: &NamespaceIdent,
+        table_name: &str,
+        partition_values: Vec<&str>,
+    ) {
+        use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, 
Literal, Struct};
+        use iceberg::transaction::{ApplyTransactionAction, Transaction};
+
+        let table = catalog
+            .load_table(&TableIdent::new(namespace.clone(), 
table_name.to_string()))
+            .await
+            .unwrap();
+
+        let data_files = partition_values
+            .iter()
+            .enumerate()
+            .map(|(i, value)| {
+                DataFileBuilder::default()
+                    .content(DataContentType::Data)
+                    .file_path(format!(
+                        "{}/data/fake_{i}.parquet",
+                        table.metadata().location()
+                    ))
+                    .file_format(DataFileFormat::Parquet)
+                    .file_size_in_bytes(128)
+                    .record_count(1)
+                    
.partition_spec_id(table.metadata().default_partition_spec_id())
+                    
.partition(Struct::from_iter(vec![Some(Literal::string(*value))]))
+                    .build()
+                    .unwrap()
+            })
+            .collect::<Vec<_>>();
+
+        let tx = Transaction::new(&table);
+        let action = tx.fast_append().add_data_files(data_files);
+        action
+            .apply(tx)
+            .unwrap()
+            .commit(catalog.as_ref())
+            .await
+            .unwrap();
+    }
+
+    /// Identity-partitioned table whose source column is in the projection
+    /// must produce `Partitioning::Hash` referencing that column.

Review Comment:
   Nice fixtures (MemoryCatalog + synthesized data files, no Parquet needed). A 
few branches I don't see covered that seem worth locking down since they fail 
silently if they regress:
   - co-location correctness vs `RepartitionExec` (see the bucketing.rs:135 
comment), the most important one;
   - spec evolution -> `UnknownPartitioning`;
   - an unsupported partition dtype (*e.g.*, timestamp) -> 
`UnknownPartitioning`;
   - a null partition value -> fallback hashing / `Unknown`.
   



##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
         limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        // Load fresh table metadata from catalog
+        // Second load: fetch the latest snapshot so scans always reflect 
current table state.
         let table = self
             .catalog
             .load_table(&self.table_ident)
             .await
             .map_err(to_datafusion_error)?;
 
-        // Create scan with fresh metadata (always use current snapshot)
-        Ok(Arc::new(IcebergTableScan::new(
+        // Build a TableScan mirroring the inputs we'll hand to 
IcebergTableScan,
+        // so plan_files() uses the same projection/filters the scan will 
replay in execute().
+        let col_names = projection.map(|indices| {
+            indices
+                .iter()
+                .map(|&i| self.schema.field(i).name().clone())
+                .collect::<Vec<_>>()
+        });
+
+        let predicate = convert_filters_to_predicate(filters);

Review Comment:
   `convert_filters_to_predicate(filters)` is computed here to build the 
planning scan, and then again inside `IcebergTableScan::new_inner` (it receives 
`filters` and re-converts). Could we convert once and thread the 
`Option<Predicate>` through `new_with_tasks` so the scan node doesn't redo it?



##########
crates/integrations/datafusion/src/table/bucketing.rs:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int32Array, Int64Array,
+    StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::hash_utils::create_hashes;
+use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
+use iceberg::scan::FileScanTask;
+use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
+use iceberg::table::Table;
+
+/// Identity-partitioned column that is also present in the output projection
+/// and whose Arrow type can be reconstructed from a `Literal` for hashing.
+pub(super) struct IdentityCol {
+    pub(super) name: String,
+    /// Position of this column in the *output* schema (after projection).
+    pub(super) output_idx: usize,
+    /// Position of this column inside the partition spec's `fields()` slice,
+    /// matching the slot order of `FileScanTask::partition`.
+    pub(super) spec_field_idx: usize,
+    pub(super) output_dtype: DataType,
+}
+
+/// Inspect the table's default partition spec and return the list of identity
+/// columns that can support a [`Partitioning::Hash`] declaration. Returns
+/// `None` if any condition is violated:
+///   - the source column for an identity field is not in the output projection
+///   - the source column's Arrow type is not currently supported by
+///     [`literal_to_array`]
+///   - the table has spec evolution (>1 historical specs), since older files
+///     may carry a partition tuple that does not align with the default spec
+///
+/// Returning `None` forces the scan to declare `UnknownPartitioning` even if
+/// bucketing succeeds.
+pub(super) fn compute_identity_cols(
+    table: &Table,
+    output_schema: &ArrowSchema,
+) -> Option<Vec<IdentityCol>> {
+    let metadata = table.metadata();
+    if metadata.partition_specs_iter().len() > 1 {
+        return None;
+    }
+    let spec = metadata.default_partition_spec();
+    let table_schema = metadata.current_schema();
+
+    let mut cols = Vec::new();
+    for (spec_field_idx, pf) in spec.fields().iter().enumerate() {
+        if pf.transform != Transform::Identity {
+            continue;
+        }
+        let source_field = table_schema.field_by_id(pf.source_id)?;
+        let output_idx = 
output_schema.index_of(source_field.name.as_str()).ok()?;
+        let output_dtype = output_schema.field(output_idx).data_type().clone();
+        if !is_supported_dtype(&output_dtype) {
+            return None;
+        }
+        cols.push(IdentityCol {
+            name: source_field.name.clone(),
+            output_idx,
+            spec_field_idx,
+            output_dtype,
+        });
+    }
+    Some(cols)
+}
+
+fn is_supported_dtype(dt: &DataType) -> bool {
+    matches!(
+        dt,
+        DataType::Boolean
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+            | DataType::Utf8
+            | DataType::Date32
+    )
+}
+
+/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols`
+/// describes a non-empty, hashable identity key, each task is hashed on
+/// that key using DataFusion's repartition hash so the resulting partitioning
+/// matches what `RepartitionExec` would produce on the same data. Tasks
+/// missing partition data fall back to hashing `data_file_path`, which still
+/// distributes evenly but breaks the `Hash` contract — the second tuple
+/// element flags whether every task supplied a full identity key.
+pub(super) fn bucket_tasks(
+    tasks: Vec<FileScanTask>,
+    n_partitions: usize,
+    identity_cols: Option<&[IdentityCol]>,
+) -> (Vec<Vec<FileScanTask>>, bool) {
+    if n_partitions == 0 {
+        return (Vec::new(), tasks.is_empty());
+    }
+    let mut buckets: Vec<Vec<FileScanTask>> = (0..n_partitions).map(|_| 
Vec::new()).collect();
+    let mut all_full_key = true;
+    let cols = identity_cols.unwrap_or(&[]);
+
+    for task in tasks {
+        let bucket_idx = match identity_hash(&task, cols) {
+            Some(h) => (h % n_partitions as u64) as usize,
+            None => {
+                all_full_key = false;
+                fallback_hash(&task) as usize % n_partitions
+            }
+        };
+        buckets[bucket_idx].push(task);
+    }
+    (buckets, all_full_key)
+}
+
+/// Hash the identity-partition values of `task` using
+/// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's
+/// hash-repartition convention. Returns `None` if the task lacks partition
+/// data or any required slot is null/unsupported.
+fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> {

Review Comment:
   The `Hash` declaration is only sound if the array we build here hashes 
*identically* to the column DataFusion sees at scan time (same `create_hashes` 
+ `REPARTITION_RANDOM_STATE`). That holds today because the reader emits 
`Utf8`/`Int32`/`Date32` and we rebuild those exact types, but it's a quiet trap 
if it ever drifts. For example DataFusion hashes `Utf8` via `<&str>::hash_one` 
but `Utf8View` via the view word (`hash_array` vs 
`hash_generic_byte_view_array` in `datafusion/common/src/hash_utils.rs`), so 
the same string hashes differently. If the reader (or a downstream coercion) 
ever produces `Utf8View` without repartitioning this side, equal keys would 
split across partitions and a join/aggregate that trusts the declared 
partitioning would return wrong results silently.
   
   Two requests: (1) a comment here making the "array dtype must equal the 
reader's output dtype" invariant explicit, and (2) a test that actually 
exercises co-location: run `create_hashes(REPARTITION_RANDOM_STATE)` over a 
real array of the partition values and assert each row lands in the same bucket 
this code assigned. Right now the tests assert the *shape* of the partitioning 
but never that the bucketing agrees with `RepartitionExec`.
   
   Separately, the hashing is structured the slow way: for every task it 
allocates a `Vec<ArrayRef>`, builds one single-element array per identity 
column, allocates a one-element `hashes` buffer, and makes its own 
`create_hashes` call. That is O(tasks * cols) tiny allocations plus one 
hash-kernel invocation per file, all on the planning path that gates every 
query. This should be a single pass: build one array of length `n_tasks` per 
identity column (one element per task), call `create_hashes` once to fill an 
`n_tasks` buffer, then take `hash % n_partitions` per task. Same result, but 
allocations drop from per-task to per-column and the kernel runs once instead 
of N times. Tables with tens of thousands of files are routine, so I would 
rather fix this now than ship a per-file hot loop in planning.



##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
         limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        // Load fresh table metadata from catalog
+        // Second load: fetch the latest snapshot so scans always reflect 
current table state.
         let table = self
             .catalog
             .load_table(&self.table_ident)
             .await
             .map_err(to_datafusion_error)?;
 
-        // Create scan with fresh metadata (always use current snapshot)
-        Ok(Arc::new(IcebergTableScan::new(
+        // Build a TableScan mirroring the inputs we'll hand to 
IcebergTableScan,
+        // so plan_files() uses the same projection/filters the scan will 
replay in execute().
+        let col_names = projection.map(|indices| {
+            indices
+                .iter()
+                .map(|&i| self.schema.field(i).name().clone())
+                .collect::<Vec<_>>()
+        });
+
+        let predicate = convert_filters_to_predicate(filters);
+
+        let mut builder = table.scan();
+        builder = match col_names {
+            Some(names) => builder.select(names),
+            None => builder.select_all(),
+        };
+        if let Some(pred) = predicate {
+            builder = builder.with_filter(pred);
+        }
+
+        let tasks: Vec<FileScanTask> = builder
+            .build()
+            .map_err(to_datafusion_error)?
+            .plan_files()

Review Comment:
   This makes `scan()` do manifest I/O during planning. The PR description 
already calls out the trade-off, which I appreciate. One question: if 
DataFusion calls `scan()` more than once during optimization, do we re-plan 
each time? iceberg-java also plans at plan time but caches the result lazily 
behind `tasks()`/`taskGroups()`. Would it be worth caching the planned/bucketed 
tasks keyed by (snapshot, projection, filter) so repeated planning doesn't 
re-read manifests?



##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -31,46 +30,113 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan, 
Partitioning, PlanProp
 use datafusion::prelude::Expr;
 use futures::{Stream, TryStreamExt};
 use iceberg::expr::Predicate;
+use iceberg::scan::{FileScanTask, TableScan};
 use iceberg::table::Table;
 
 use super::expr_to_predicate::convert_filters_to_predicate;
 use crate::to_datafusion_error;
 
-/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
-/// necessary details and computed properties required for execution planning.
+/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
+///
+/// Has two construction modes: [`IcebergTableScan::new`] for a lazy
+/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an
+/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets.
+///
+/// Note: in eager mode the underlying `TableScan` is rebuilt on every
+/// `execute(partition)` call. The per-build cost is bounded (no I/O) and
+/// keeps the plan free of `Arc`-shared evaluator caches that are awkward to
+/// serialize across workers.
 #[derive(Debug)]
 pub struct IcebergTableScan {
     /// A table in the catalog.
     table: Table,
     /// Snapshot of the table to scan.
     snapshot_id: Option<i64>,
-    /// Stores certain, often expensive to compute,
-    /// plan properties used in query optimization.
+    /// Cached plan properties used by query optimization.
     plan_properties: Arc<PlanProperties>,
-    /// Projection column names, None means all columns
+    /// Projection column names, None means all columns.
     projection: Option<Vec<String>>,
-    /// Filters to apply to the table scan
+    /// Filters to apply to the table scan.
     predicates: Option<Predicate>,
-    /// Optional limit on the number of rows to return
+    /// Pre-planned file scan tasks per partition (eager mode), or `None` 
(lazy mode).
+    buckets: Option<Vec<Vec<FileScanTask>>>,
+    /// Optional limit on the number of rows to return.
     limit: Option<usize>,
 }
 
 impl IcebergTableScan {
-    /// Creates a new [`IcebergTableScan`] object.
-    pub(crate) fn new(
+    /// Creates a lazy single-partition scan that plans and reads all tasks
+    /// inside `execute(0)`. Used by
+    /// 
[`IcebergStaticTableProvider`][crate::table::IcebergStaticTableProvider].
+    pub fn new(
+        table: Table,
+        snapshot_id: Option<i64>,
+        schema: ArrowSchemaRef,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> Self {
+        Self::new_inner(
+            table,
+            snapshot_id,
+            schema,
+            projection,
+            filters,
+            limit,
+            Partitioning::UnknownPartitioning(1),
+            None,
+        )
+    }
+
+    /// Creates an eager multi-partition scan over pre-planned task buckets.
+    /// Partition `i` streams `buckets[i]`. The caller is responsible for
+    /// ensuring `partitioning` matches the bucketing. Used by
+    /// [`IcebergTableProvider`][crate::table::IcebergTableProvider].
+    #[allow(clippy::too_many_arguments)]

Review Comment:
   `new_with_tasks` takes raw `buckets` *and* a `Partitioning` with nothing 
enforcing `buckets.len() == partitioning.partition_count()`, and both this and 
`new` need `#[allow(clippy::too_many_arguments)]`. Could we collapse these into 
a small params struct (or builder) that validates the bucket/partitioning 
consistency and returns `Result`? That also removes the 
`new`/`new_with_tasks`/`new_inner` parameter repetition.
   
   Related: `new` and `new_with_tasks` are now `pub`. Is that needed for the 
two in-crate providers, or could they stay `pub(crate)`? Public constructors 
that can build an inconsistent node are easy to misuse.



##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -31,46 +30,113 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan, 
Partitioning, PlanProp
 use datafusion::prelude::Expr;
 use futures::{Stream, TryStreamExt};
 use iceberg::expr::Predicate;
+use iceberg::scan::{FileScanTask, TableScan};
 use iceberg::table::Table;
 
 use super::expr_to_predicate::convert_filters_to_predicate;
 use crate::to_datafusion_error;
 
-/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
-/// necessary details and computed properties required for execution planning.
+/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
+///
+/// Has two construction modes: [`IcebergTableScan::new`] for a lazy
+/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an
+/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets.
+///
+/// Note: in eager mode the underlying `TableScan` is rebuilt on every
+/// `execute(partition)` call. The per-build cost is bounded (no I/O) and
+/// keeps the plan free of `Arc`-shared evaluator caches that are awkward to
+/// serialize across workers.
 #[derive(Debug)]
 pub struct IcebergTableScan {
     /// A table in the catalog.
     table: Table,
     /// Snapshot of the table to scan.
     snapshot_id: Option<i64>,
-    /// Stores certain, often expensive to compute,
-    /// plan properties used in query optimization.
+    /// Cached plan properties used by query optimization.
     plan_properties: Arc<PlanProperties>,
-    /// Projection column names, None means all columns
+    /// Projection column names, None means all columns.
     projection: Option<Vec<String>>,
-    /// Filters to apply to the table scan
+    /// Filters to apply to the table scan.
     predicates: Option<Predicate>,
-    /// Optional limit on the number of rows to return
+    /// Pre-planned file scan tasks per partition (eager mode), or `None` 
(lazy mode).
+    buckets: Option<Vec<Vec<FileScanTask>>>,

Review Comment:
   The `Option` plus the `buckets()` accessor returning `&[]` via 
`unwrap_or(&[])` could just be a `Vec` that's empty in lazy mode, which would 
drop a layer. That said, the `Option` does signal "lazy vs eager" intent, so I 
could go either way; your call.



##########
crates/integrations/datafusion/src/table/bucketing.rs:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int32Array, Int64Array,
+    StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::hash_utils::create_hashes;
+use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
+use iceberg::scan::FileScanTask;
+use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
+use iceberg::table::Table;
+
+/// Identity-partitioned column that is also present in the output projection
+/// and whose Arrow type can be reconstructed from a `Literal` for hashing.
+pub(super) struct IdentityCol {
+    pub(super) name: String,
+    /// Position of this column in the *output* schema (after projection).
+    pub(super) output_idx: usize,
+    /// Position of this column inside the partition spec's `fields()` slice,
+    /// matching the slot order of `FileScanTask::partition`.
+    pub(super) spec_field_idx: usize,
+    pub(super) output_dtype: DataType,
+}
+
+/// Inspect the table's default partition spec and return the list of identity
+/// columns that can support a [`Partitioning::Hash`] declaration. Returns
+/// `None` if any condition is violated:
+///   - the source column for an identity field is not in the output projection
+///   - the source column's Arrow type is not currently supported by
+///     [`literal_to_array`]
+///   - the table has spec evolution (>1 historical specs), since older files
+///     may carry a partition tuple that does not align with the default spec
+///
+/// Returning `None` forces the scan to declare `UnknownPartitioning` even if
+/// bucketing succeeds.
+pub(super) fn compute_identity_cols(
+    table: &Table,
+    output_schema: &ArrowSchema,
+) -> Option<Vec<IdentityCol>> {
+    let metadata = table.metadata();
+    if metadata.partition_specs_iter().len() > 1 {
+        return None;
+    }
+    let spec = metadata.default_partition_spec();
+    let table_schema = metadata.current_schema();
+
+    let mut cols = Vec::new();
+    for (spec_field_idx, pf) in spec.fields().iter().enumerate() {
+        if pf.transform != Transform::Identity {
+            continue;
+        }
+        let source_field = table_schema.field_by_id(pf.source_id)?;
+        let output_idx = 
output_schema.index_of(source_field.name.as_str()).ok()?;
+        let output_dtype = output_schema.field(output_idx).data_type().clone();
+        if !is_supported_dtype(&output_dtype) {
+            return None;
+        }
+        cols.push(IdentityCol {
+            name: source_field.name.clone(),
+            output_idx,
+            spec_field_idx,
+            output_dtype,
+        });
+    }
+    Some(cols)
+}
+
+fn is_supported_dtype(dt: &DataType) -> bool {
+    matches!(
+        dt,
+        DataType::Boolean
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+            | DataType::Utf8
+            | DataType::Date32
+    )
+}
+
+/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols`
+/// describes a non-empty, hashable identity key, each task is hashed on
+/// that key using DataFusion's repartition hash so the resulting partitioning
+/// matches what `RepartitionExec` would produce on the same data. Tasks
+/// missing partition data fall back to hashing `data_file_path`, which still
+/// distributes evenly but breaks the `Hash` contract — the second tuple
+/// element flags whether every task supplied a full identity key.
+pub(super) fn bucket_tasks(
+    tasks: Vec<FileScanTask>,
+    n_partitions: usize,
+    identity_cols: Option<&[IdentityCol]>,
+) -> (Vec<Vec<FileScanTask>>, bool) {
+    if n_partitions == 0 {
+        return (Vec::new(), tasks.is_empty());
+    }
+    let mut buckets: Vec<Vec<FileScanTask>> = (0..n_partitions).map(|_| 
Vec::new()).collect();
+    let mut all_full_key = true;
+    let cols = identity_cols.unwrap_or(&[]);
+
+    for task in tasks {
+        let bucket_idx = match identity_hash(&task, cols) {
+            Some(h) => (h % n_partitions as u64) as usize,
+            None => {
+                all_full_key = false;
+                fallback_hash(&task) as usize % n_partitions
+            }
+        };
+        buckets[bucket_idx].push(task);
+    }
+    (buckets, all_full_key)
+}
+
+/// Hash the identity-partition values of `task` using
+/// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's
+/// hash-repartition convention. Returns `None` if the task lacks partition
+/// data or any required slot is null/unsupported.
+fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> {
+    if cols.is_empty() {
+        return None;
+    }
+    let partition = task.partition.as_ref()?;
+    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(cols.len());
+    for col in cols {
+        let lit = partition.fields().get(col.spec_field_idx)?.as_ref()?;
+        arrays.push(literal_to_array(lit, &col.output_dtype)?);
+    }
+    let mut hashes = vec![0u64; 1];
+    create_hashes(
+        &arrays,
+        REPARTITION_RANDOM_STATE.random_state(),
+        &mut hashes,
+    )
+    .ok()?;
+    Some(hashes[0])
+}
+
+/// Deterministic per-file fallback used when `identity_hash` cannot produce a
+/// bucket. The hash function does not need to match DataFusion's because any
+/// task taking this path causes the scan to drop to `UnknownPartitioning`.
+fn fallback_hash(task: &FileScanTask) -> u64 {
+    use std::collections::hash_map::DefaultHasher;
+    use std::hash::{Hash, Hasher};
+    let mut hasher = DefaultHasher::new();
+    task.data_file_path.hash(&mut hasher);
+    hasher.finish()
+}
+
+/// Materialize a single-element Arrow array of `dt` holding the value of
+/// `lit`. The Arrow type must match what DataFusion will see for this column
+/// at scan time, otherwise `create_hashes` would dispatch on a different type
+/// and produce a hash that disagrees with DataFusion's row-wise hashing.
+fn literal_to_array(lit: &Literal, dt: &DataType) -> Option<ArrayRef> {

Review Comment:
   I think this duplicates `create_primitive_array_single_element` in 
`crates/iceberg/src/arrow/value.rs:627` (currently `pub(crate)`), which already 
maps a `Literal` to a single-element array for ~20 types including Timestamp, 
Decimal128, and Binary, and handles nulls. Could we promote that to `pub` (or 
re-export it) and call it here? That would drop ~60 lines and, combined with 
`is_supported_dtype`, widen the set of partition types that can declare `Hash` 
instead of falling back. The deliberate type gate can stay where it is.



##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -143,36 +213,34 @@ impl ExecutionPlan for IcebergTableScan {
 
     fn execute(
         &self,
-        _partition: usize,
+        partition: usize,
         _context: Arc<TaskContext>,
     ) -> DFResult<SendableRecordBatchStream> {
-        let fut = get_batch_stream(
+        let bucket = match &self.buckets {
+            Some(buckets) => 
Some(buckets.get(partition).cloned().ok_or_else(|| {

Review Comment:
   `buckets.get(partition).cloned()` deep-copies a whole `Vec<FileScanTask>` on 
every `execute(partition)`, and each `FileScanTask` carries a predicate tree 
plus schema, so this is a real per-partition copy of all the planned task 
state, not a cheap clone. Storing the buckets as `Arc<[Vec<FileScanTask>]>` (or 
wrapping each bucket in an `Arc`) lets `execute` hand out a pointer and drop 
the copy entirely. Since `execute` runs per partition on the query hot path, I 
would make this an `Arc` rather than clone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to