toutane commented on code in PR #2298: URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3420569942
########## crates/integrations/datafusion/src/table/bucketing.rs: ########## @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, +}; +use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema}; +use datafusion::common::hash_utils::create_hashes; +use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE; +use iceberg::scan::FileScanTask; +use iceberg::spec::{Literal, PrimitiveLiteral, Transform}; +use iceberg::table::Table; + +/// Identity-partitioned column that is also present in the output projection +/// and whose Arrow type can be reconstructed from a `Literal` for hashing. +pub(super) struct IdentityCol { + pub(super) name: String, + /// Position of this column in the *output* schema (after projection). + pub(super) output_idx: usize, + /// Position of this column inside the partition spec's `fields()` slice, + /// matching the slot order of `FileScanTask::partition`. + pub(super) spec_field_idx: usize, + pub(super) output_dtype: DataType, +} + +/// Inspect the table's default partition spec and return the list of identity +/// columns that can support a [`Partitioning::Hash`] declaration. Returns +/// `None` if any condition is violated: +/// - the source column for an identity field is not in the output projection +/// - the source column's Arrow type is not currently supported by +/// [`literal_to_array`] +/// - the table has spec evolution (>1 historical specs), since older files +/// may carry a partition tuple that does not align with the default spec +/// +/// Returning `None` forces the scan to declare `UnknownPartitioning` even if +/// bucketing succeeds. +pub(super) fn compute_identity_cols( + table: &Table, + output_schema: &ArrowSchema, +) -> Option<Vec<IdentityCol>> { + let metadata = table.metadata(); + if metadata.partition_specs_iter().len() > 1 { + return None; + } + let spec = metadata.default_partition_spec(); + let table_schema = metadata.current_schema(); + + let mut cols = Vec::new(); + for (spec_field_idx, pf) in spec.fields().iter().enumerate() { + if pf.transform != Transform::Identity { + continue; + } + let source_field = table_schema.field_by_id(pf.source_id)?; + let output_idx = output_schema.index_of(source_field.name.as_str()).ok()?; + let output_dtype = output_schema.field(output_idx).data_type().clone(); + if !is_supported_dtype(&output_dtype) { + return None; + } + cols.push(IdentityCol { + name: source_field.name.clone(), + output_idx, + spec_field_idx, + output_dtype, + }); + } + Some(cols) +} + +fn is_supported_dtype(dt: &DataType) -> bool { + matches!( + dt, + DataType::Boolean + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::Date32 + ) +} + +/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols` +/// describes a non-empty, hashable identity key, each task is hashed on +/// that key using DataFusion's repartition hash so the resulting partitioning +/// matches what `RepartitionExec` would produce on the same data. Tasks +/// missing partition data fall back to hashing `data_file_path`, which still +/// distributes evenly but breaks the `Hash` contract — the second tuple +/// element flags whether every task supplied a full identity key. +pub(super) fn bucket_tasks( + tasks: Vec<FileScanTask>, + n_partitions: usize, + identity_cols: Option<&[IdentityCol]>, +) -> (Vec<Vec<FileScanTask>>, bool) { + if n_partitions == 0 { + return (Vec::new(), tasks.is_empty()); + } + let mut buckets: Vec<Vec<FileScanTask>> = (0..n_partitions).map(|_| Vec::new()).collect(); + let mut all_full_key = true; + let cols = identity_cols.unwrap_or(&[]); + + for task in tasks { + let bucket_idx = match identity_hash(&task, cols) { + Some(h) => (h % n_partitions as u64) as usize, + None => { + all_full_key = false; + fallback_hash(&task) as usize % n_partitions + } + }; + buckets[bucket_idx].push(task); + } + (buckets, all_full_key) +} + +/// Hash the identity-partition values of `task` using +/// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's +/// hash-repartition convention. Returns `None` if the task lacks partition +/// data or any required slot is null/unsupported. +fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> { Review Comment: > Two requests: (1) a comment here making the "array dtype must equal the reader's output dtype" invariant explicit, and (2) a test that actually exercises co-location: run `create_hashes(REPARTITION_RANDOM_STATE)` over a real array of the partition values and assert each row lands in the same bucket this code assigned. Both done in ace9b7e1: - (1) The invariant is now stated where we declare `Partitioning::Hash` (table/mod.rs): the declaration is only sound if the arrays we build from partition literals hash identically to what the reader emits at scan time. Since DataFusion's hash dispatch is dtype-specific, any drift in the reader output dtype (e.g. Utf8 to Utf8View) must either materialize that exact dtype on the bucketing path or fall back to `UnknownPartitioning`. (Wording later refined in 971aa0f0.) - (2) `test_identity_partitioned_hash_buckets_match_datafusion_repartition` runs `create_hashes` + `REPARTITION_RANDOM_STATE` over a real array of the partition values and asserts every row lands in the bucket `bucket_tasks` assigned -- i.e. agreement with `RepartitionExec`, not just the partitioning shape. > If the reader (or a downstream coercion) ever produces `Utf8View` without repartitioning this side, equal keys would split across partitions and a join/aggregate that trusts the declared partitioning would return wrong results silently. This exact trap is locked by `test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning` (2802af43): it wires an output schema with `Utf8View` as a deliberately unsupported dtype and asserts the scan declares `UnknownPartitioning` instead of `Hash`. > Separately, the hashing is structured the slow way: for every task it allocates a `Vec<ArrayRef>`, builds one single-element array per identity column, allocates a one-element `hashes` buffer, and makes its own `create_hashes` call. That is O(tasks * cols) tiny allocations plus one hash-kernel invocation per file [...]. This should be a single pass: build one array of length `n_tasks` per identity column [...] call `create_hashes` once [...] then take `hash % n_partitions` per task. Done in b6369e26. `identity_hash` (per-task) is replaced by `identity_hashes_for_tasks`: one `PrimitiveLiteralArrayBuilder` per identity column with `capacity = n_tasks`, a single pass appending each task's partition literal, then one `create_hashes` call filling an `n_tasks` buffer; the bucket is `hash % n_partitions` per task. Allocations drop from per-task to per-column and the hash kernel runs once instead of N times. The per-file fallback (`fallback_hash`) is unchanged and still only applies to tasks missing a full identity key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
