liurenjie1024 commented on code in PR #1821:
URL: https://github.com/apache/iceberg-rust/pull/1821#discussion_r2489767661
##########
crates/iceberg/src/scan/task.rs:
##########
@@ -54,6 +58,24 @@ pub struct FileScanTask {
/// The list of delete files that may need to be applied to this data file
pub deletes: Vec<FileScanTaskDeleteFile>,
+
+ /// Partition data from the manifest entry, used to identify which columns
can use
+ /// constant values from partition metadata vs. reading from the data file.
+ /// Per the Iceberg spec, only identity-transformed partition fields
should use constants.
+ #[serde(skip)]
+ pub partition: Option<Struct>,
+
+ /// The partition spec ID for this file, required to look up the correct
+ /// partition spec and determine which fields are identity-transformed.
+ /// Not serialized as partition data is runtime-only and populated from
manifest entries.
+ #[serde(skip)]
+ pub partition_spec_id: Option<i32>,
Review Comment:
Not needed, we have a `spec_id` in `PartitionSpec`.
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -270,57 +394,199 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+ constants_map: HashMap<i32, PrimitiveLiteral>,
+ partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
- projected_iceberg_field_ids.iter().map(|field_id|{
- let (target_field, _) =
field_id_to_mapped_schema_map.get(field_id).ok_or(
- Error::new(ErrorKind::Unexpected, "could not find field in
schema")
- )?;
- let target_type = target_field.data_type();
+ // Detect field ID conflicts that require name-based mapping.
+ //
+ // Conflicts occur when fields with initial_default (from add_files or
schema evolution)
+ // have field IDs that match Parquet field IDs, but refer to different
columns.
+ //
+ // Example from add_files (Hive table import):
+ // Parquet file field IDs: name=1, subdept=2
+ // Iceberg schema field IDs: id=1, name=2, dept=3, subdept=4
+ // Partition columns (id, dept) have initial_default values
+ //
+ // Without name-based fallback, we'd incorrectly:
+ // - Read partition column "id" (field_id=1) from Parquet field_id=1
(which is "name")
+ // - Read data column "name" (field_id=2) from Parquet field_id=2
(which is "subdept")
+ //
+ // The fix: When conflicts exist, use name-based mapping for data
columns.
+ // This matches Java's behavior in add_files procedure.
+ //
+ // Note: We do NOT treat identity-partitioned fields as conflicts. For
identity partitions,
+ // it's NORMAL for the source column to exist in Parquet - we just use
the constant from
+ // partition metadata instead of reading the file.
+ //
+ // See: TestAddFilesProcedure.addDataPartitionedByIdAndDept() in
iceberg-java
+
+ // Build a set of source field IDs used in NON-identity partition
transforms.
+ // These are regular data columns that happen to be used for
partitioning (e.g., bucket, truncate).
+ // They should be read from Parquet files normally, not treated as
partition columns.
+ let non_identity_partition_source_ids: std::collections::HashSet<i32> =
+ if let Some(spec) = partition_spec {
+ spec.fields()
+ .iter()
+ .filter(|f| !matches!(f.transform, Transform::Identity))
+ .map(|f| f.source_id)
+ .collect()
+ } else {
+ std::collections::HashSet::new()
+ };
+
+ let has_field_id_conflict =
projected_iceberg_field_ids.iter().any(|field_id| {
Review Comment:
I'm confused about this part. I think the rule about missing column is quite
clear in [iceberg
spec](https://iceberg.apache.org/spec/#column-projection:~:text=Values%20for%20field%20ids%20which%20are%20not%20present%20in%20a%20data%20file%20must%20be%20resolved%20according%20the%20following%20rules%3A),
it's just a fallback chain to lookup.
##########
crates/iceberg/src/scan/task.rs:
##########
@@ -54,6 +58,24 @@ pub struct FileScanTask {
/// The list of delete files that may need to be applied to this data file
pub deletes: Vec<FileScanTaskDeleteFile>,
+
+ /// Partition data from the manifest entry, used to identify which columns
can use
+ /// constant values from partition metadata vs. reading from the data file.
+ /// Per the Iceberg spec, only identity-transformed partition fields
should use constants.
+ #[serde(skip)]
+ pub partition: Option<Struct>,
Review Comment:
I'm fine with implementing serialization for `Struct` in follow up pr, but I
don't think the default behavior is skipping, which may lead to bugs. We could
provide a function with
[serialize_with](https://serde.rs/field-attrs.html#serialize_with) attribute,
and this function could just return an NotImplemented error for now.
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -270,57 +394,199 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+ constants_map: HashMap<i32, PrimitiveLiteral>,
+ partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
- projected_iceberg_field_ids.iter().map(|field_id|{
- let (target_field, _) =
field_id_to_mapped_schema_map.get(field_id).ok_or(
- Error::new(ErrorKind::Unexpected, "could not find field in
schema")
- )?;
- let target_type = target_field.data_type();
+ // Detect field ID conflicts that require name-based mapping.
+ //
+ // Conflicts occur when fields with initial_default (from add_files or
schema evolution)
+ // have field IDs that match Parquet field IDs, but refer to different
columns.
+ //
+ // Example from add_files (Hive table import):
+ // Parquet file field IDs: name=1, subdept=2
+ // Iceberg schema field IDs: id=1, name=2, dept=3, subdept=4
+ // Partition columns (id, dept) have initial_default values
+ //
+ // Without name-based fallback, we'd incorrectly:
+ // - Read partition column "id" (field_id=1) from Parquet field_id=1
(which is "name")
+ // - Read data column "name" (field_id=2) from Parquet field_id=2
(which is "subdept")
+ //
+ // The fix: When conflicts exist, use name-based mapping for data
columns.
+ // This matches Java's behavior in add_files procedure.
+ //
+ // Note: We do NOT treat identity-partitioned fields as conflicts. For
identity partitions,
+ // it's NORMAL for the source column to exist in Parquet - we just use
the constant from
+ // partition metadata instead of reading the file.
+ //
+ // See: TestAddFilesProcedure.addDataPartitionedByIdAndDept() in
iceberg-java
+
+ // Build a set of source field IDs used in NON-identity partition
transforms.
+ // These are regular data columns that happen to be used for
partitioning (e.g., bucket, truncate).
+ // They should be read from Parquet files normally, not treated as
partition columns.
+ let non_identity_partition_source_ids: std::collections::HashSet<i32> =
+ if let Some(spec) = partition_spec {
+ spec.fields()
+ .iter()
+ .filter(|f| !matches!(f.transform, Transform::Identity))
+ .map(|f| f.source_id)
+ .collect()
+ } else {
+ std::collections::HashSet::new()
+ };
+
+ let has_field_id_conflict =
projected_iceberg_field_ids.iter().any(|field_id| {
+ let field = snapshot_schema.field_by_id(*field_id);
+ let has_initial_default = field.and_then(|f|
f.initial_default.as_ref()).is_some();
+ let in_source_schema =
field_id_to_source_schema_map.contains_key(field_id);
+ let in_constants = constants_map.contains_key(field_id);
+ let is_non_identity_partition_source =
+ non_identity_partition_source_ids.contains(field_id);
+
+ // A field ID conflict occurs when:
+ // 1. Field has initial_default (from add_files or schema
evolution)
+ // 2. Field exists in Parquet by field ID
+ // 3. Field is NOT an identity-partitioned column (those use
constants)
+ // 4. Field is NOT a source for non-identity partitioning
(bucket/truncate/etc - these are data columns)
+ has_initial_default
+ && in_source_schema
+ && !in_constants
+ && !is_non_identity_partition_source
+ });
+
+ // Build name-based mapping if there's a field ID conflict
+ let name_to_source_schema_map: HashMap<String, (FieldRef, usize)> =
source_schema
Review Comment:
This is incorrect, the iceberg spec says we should use default name mapping
in table metadata, rather this way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]