toutane commented on code in PR #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3420714493


##########
crates/integrations/datafusion/src/table/bucketing.rs:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{
+    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, 
Int32Array, Int64Array,
+    StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
+use datafusion::common::hash_utils::create_hashes;
+use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
+use iceberg::scan::FileScanTask;
+use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
+use iceberg::table::Table;
+
+/// Identity-partitioned column that is also present in the output projection
+/// and whose Arrow type can be reconstructed from a `Literal` for hashing.
+pub(super) struct IdentityCol {
+    pub(super) name: String,
+    /// Position of this column in the *output* schema (after projection).
+    pub(super) output_idx: usize,
+    /// Position of this column inside the partition spec's `fields()` slice,
+    /// matching the slot order of `FileScanTask::partition`.
+    pub(super) spec_field_idx: usize,
+    pub(super) output_dtype: DataType,
+}
+
+/// Inspect the table's default partition spec and return the list of identity
+/// columns that can support a [`Partitioning::Hash`] declaration. Returns
+/// `None` if any condition is violated:
+///   - the source column for an identity field is not in the output projection
+///   - the source column's Arrow type is not currently supported by
+///     [`literal_to_array`]
+///   - the table has spec evolution (>1 historical specs), since older files
+///     may carry a partition tuple that does not align with the default spec
+///
+/// Returning `None` forces the scan to declare `UnknownPartitioning` even if
+/// bucketing succeeds.
+pub(super) fn compute_identity_cols(
+    table: &Table,
+    output_schema: &ArrowSchema,
+) -> Option<Vec<IdentityCol>> {
+    let metadata = table.metadata();
+    if metadata.partition_specs_iter().len() > 1 {
+        return None;
+    }
+    let spec = metadata.default_partition_spec();
+    let table_schema = metadata.current_schema();
+
+    let mut cols = Vec::new();
+    for (spec_field_idx, pf) in spec.fields().iter().enumerate() {
+        if pf.transform != Transform::Identity {
+            continue;
+        }
+        let source_field = table_schema.field_by_id(pf.source_id)?;
+        let output_idx = 
output_schema.index_of(source_field.name.as_str()).ok()?;
+        let output_dtype = output_schema.field(output_idx).data_type().clone();
+        if !is_supported_dtype(&output_dtype) {
+            return None;
+        }
+        cols.push(IdentityCol {
+            name: source_field.name.clone(),
+            output_idx,
+            spec_field_idx,
+            output_dtype,
+        });
+    }
+    Some(cols)
+}
+
+fn is_supported_dtype(dt: &DataType) -> bool {
+    matches!(
+        dt,
+        DataType::Boolean
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+            | DataType::Utf8
+            | DataType::Date32
+    )
+}
+
+/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols`
+/// describes a non-empty, hashable identity key, each task is hashed on
+/// that key using DataFusion's repartition hash so the resulting partitioning
+/// matches what `RepartitionExec` would produce on the same data. Tasks
+/// missing partition data fall back to hashing `data_file_path`, which still
+/// distributes evenly but breaks the `Hash` contract — the second tuple
+/// element flags whether every task supplied a full identity key.
+pub(super) fn bucket_tasks(

Review Comment:
   Agreed, documented the limitation in the bucketing module doc, and tracked 
the size-based bin-packing follow-up in #128.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to