liurenjie1024 commented on code in PR #1821:
URL: https://github.com/apache/iceberg-rust/pull/1821#discussion_r2493636085


##########
crates/iceberg/src/scan/task.rs:
##########
@@ -62,20 +81,29 @@ pub struct FileScanTask {
     /// Partition data from the manifest entry, used to identify which columns 
can use
     /// constant values from partition metadata vs. reading from the data file.
     /// Per the Iceberg spec, only identity-transformed partition fields 
should use constants.
-    #[serde(skip)]
+    #[serde(default)]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(serialize_with = "serialize_not_implemented")]
+    #[serde(deserialize_with = "deserialize_not_implemented")]
     pub partition: Option<Struct>,
 
-    /// The partition spec ID for this file, required to look up the correct
-    /// partition spec and determine which fields are identity-transformed.
-    /// Not serialized as partition data is runtime-only and populated from 
manifest entries.
-    #[serde(skip)]
-    pub partition_spec_id: Option<i32>,
-
     /// The partition spec for this file, used to distinguish identity 
transforms
     /// (which use partition metadata constants) from non-identity transforms 
like
     /// bucket/truncate (which must read source columns from the data file).
-    #[serde(skip)]
+    #[serde(default)]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(serialize_with = "serialize_not_implemented")]
+    #[serde(deserialize_with = "deserialize_not_implemented")]

Review Comment:
   It's not related to this pr, but this is why I don't like pub fields in 
struct. Adding a field needs to change a lot of unrelated things, also this is 
error prone since this partition spec is supposed to be the one associated with 
data file, not default table partition spec.



##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -270,88 +371,233 @@ impl RecordBatchTransformer {
         snapshot_schema: &IcebergSchema,
         projected_iceberg_field_ids: &[i32],
         field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+        constants_map: HashMap<i32, PrimitiveLiteral>,
+        _partition_spec: Option<&PartitionSpec>,
+        name_mapping: Option<&NameMapping>,
     ) -> Result<Vec<ColumnSource>> {
         let field_id_to_source_schema_map =
             Self::build_field_id_to_arrow_schema_map(source_schema)?;
 
-        projected_iceberg_field_ids.iter().map(|field_id|{
-            let (target_field, _) = 
field_id_to_mapped_schema_map.get(field_id).ok_or(
-                Error::new(ErrorKind::Unexpected, "could not find field in 
schema")
-            )?;
-            let target_type = target_field.data_type();
+        // Build name-based map for spec rule #2 (name mapping)
+        // This allows us to find Parquet columns by name when field IDs are 
missing/conflicting
+        let field_name_to_source_schema_map =
+            Self::build_field_name_to_arrow_schema_map(source_schema);
 
-            Ok(if let Some((source_field, source_index)) = 
field_id_to_source_schema_map.get(field_id) {
-                // column present in source
+        projected_iceberg_field_ids
+            .iter()
+            .map(|field_id| {
+                let (target_field, _) =
+                    field_id_to_mapped_schema_map
+                        .get(field_id)
+                        .ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            "could not find field in schema",
+                        ))?;
+                let target_type = target_field.data_type();
 
-                if source_field.data_type().equals_datatype(target_type) {
-                    // no promotion required
-                    ColumnSource::PassThrough {
-                        source_index: *source_index
-                    }
-                } else {
-                    // promotion required
-                    ColumnSource::Promote {
+                let iceberg_field = 
snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
+                    ErrorKind::Unexpected,
+                    "Field not found in snapshot schema",
+                ))?;
+
+                // Determine how to source this column per Iceberg spec 
"Column Projection" rules.
+                //
+                // Per the spec 
(https://iceberg.apache.org/spec/#column-projection):
+                // "Values for field ids which are not present in a data file 
must be resolved
+                // according the following rules:"
+                //
+                // 1. "Return the value from partition metadata if an Identity 
Transform exists
+                //     for the field and the partition value is present in the 
partition struct
+                //     on data_file object in the manifest."
+                // 2. "Use schema.name-mapping.default metadata to map field 
id to columns
+                //     without field id as described below and use the column 
if it is present."
+                // 3. "Return the default value if it has a defined 
initial-default"
+                // 4. "Return null in all other cases"
+
+                let column_source = if let Some(constant_value) = 
constants_map.get(field_id) {

Review Comment:
   This should not be first step. According the projection rule, this only 
happens as first check when look up by id failed.



##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -270,88 +371,233 @@ impl RecordBatchTransformer {
         snapshot_schema: &IcebergSchema,
         projected_iceberg_field_ids: &[i32],
         field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+        constants_map: HashMap<i32, PrimitiveLiteral>,
+        _partition_spec: Option<&PartitionSpec>,
+        name_mapping: Option<&NameMapping>,
     ) -> Result<Vec<ColumnSource>> {
         let field_id_to_source_schema_map =
             Self::build_field_id_to_arrow_schema_map(source_schema)?;
 
-        projected_iceberg_field_ids.iter().map(|field_id|{
-            let (target_field, _) = 
field_id_to_mapped_schema_map.get(field_id).ok_or(
-                Error::new(ErrorKind::Unexpected, "could not find field in 
schema")
-            )?;
-            let target_type = target_field.data_type();
+        // Build name-based map for spec rule #2 (name mapping)
+        // This allows us to find Parquet columns by name when field IDs are 
missing/conflicting
+        let field_name_to_source_schema_map =
+            Self::build_field_name_to_arrow_schema_map(source_schema);
 
-            Ok(if let Some((source_field, source_index)) = 
field_id_to_source_schema_map.get(field_id) {
-                // column present in source
+        projected_iceberg_field_ids
+            .iter()
+            .map(|field_id| {
+                let (target_field, _) =
+                    field_id_to_mapped_schema_map
+                        .get(field_id)
+                        .ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            "could not find field in schema",
+                        ))?;
+                let target_type = target_field.data_type();
 
-                if source_field.data_type().equals_datatype(target_type) {
-                    // no promotion required
-                    ColumnSource::PassThrough {
-                        source_index: *source_index
-                    }
-                } else {
-                    // promotion required
-                    ColumnSource::Promote {
+                let iceberg_field = 
snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
+                    ErrorKind::Unexpected,
+                    "Field not found in snapshot schema",
+                ))?;
+
+                // Determine how to source this column per Iceberg spec 
"Column Projection" rules.
+                //
+                // Per the spec 
(https://iceberg.apache.org/spec/#column-projection):
+                // "Values for field ids which are not present in a data file 
must be resolved
+                // according the following rules:"
+                //
+                // 1. "Return the value from partition metadata if an Identity 
Transform exists
+                //     for the field and the partition value is present in the 
partition struct
+                //     on data_file object in the manifest."
+                // 2. "Use schema.name-mapping.default metadata to map field 
id to columns
+                //     without field id as described below and use the column 
if it is present."
+                // 3. "Return the default value if it has a defined 
initial-default"
+                // 4. "Return null in all other cases"
+
+                let column_source = if let Some(constant_value) = 
constants_map.get(field_id) {
+                    // Spec rule #1: Identity-partitioned column - use 
constant from partition metadata
+                    ColumnSource::Add {
+                        value: Some(constant_value.clone()),
                         target_type: target_type.clone(),
-                        source_index: *source_index,
                     }
-                }
-            } else {
-                // column must be added
-                let iceberg_field = 
snapshot_schema.field_by_id(*field_id).ok_or(
-                    Error::new(ErrorKind::Unexpected, "Field not found in 
snapshot schema")
-                )?;
-
-                let default_value = if let Some(iceberg_default_value) =
-                    &iceberg_field.initial_default
-                {
-                    let Literal::Primitive(primitive_literal) = 
iceberg_default_value else {
-                        return Err(Error::new(
-                            ErrorKind::Unexpected,
-                            format!("Default value for column must be 
primitive type, but encountered {iceberg_default_value:?}")
-                        ));
-                    };
-                    Some(primitive_literal.clone())
                 } else {
-                    None
+                    // Check if field ID exists in Parquet, but verify it's 
the CORRECT field.
+                    //
+                    // Per the Iceberg spec 
(https://iceberg.apache.org/spec/#column-projection):
+                    // "Values for field ids which are NOT PRESENT in a data 
file must be resolved
+                    // according the following rules..."
+                    //
+                    // The key insight: In add_files scenarios (Hive table 
migrations), Parquet files
+                    // may have field IDs that conflict with the Iceberg 
schema's field IDs:
+                    //
+                    // Example:
+                    // - Parquet file written with: field_id=1→"name", 
field_id=2→"dept"
+                    // - Imported via add_files with: field_id=1→"id" 
(partition), field_id=2→"name", field_id=3→"dept"
+                    //
+                    // When we look for Iceberg field_id=1 ("id"), we find a 
field_id=1 in the Parquet file,
+                    // but it's the WRONG field (it's "name", not "id"). The 
correct field (id=1, name="id")
+                    // is NOT PRESENT in the Parquet file - it only exists as 
partition metadata.
+                    //
+                    // Per the spec: when a field is "not present", we should 
apply rules 2-4 (name mapping,
+                    // initial-default, or null).
+                    //
+                    // Java's approach (ParquetSchemaUtil.applyNameMapping, 
ReadConf.java lines 83-85):
+                    // Java REWRITES the Parquet schema's field IDs based on 
names BEFORE projection:
+                    // ```java
+                    // if (nameMapping != null) {
+                    //   typeWithIds = 
ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping);
+                    //   // Now field IDs match Iceberg schema based on name 
mapping
+                    //   this.projection = 
ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
+                    // }
+                    // ```
+                    //
+                    // Our approach achieves the same result but detects 
conflicts DURING projection:
+                    // - When name mapping is present, field ID matches alone 
aren't sufficient
+                    // - We verify the field NAME also matches to ensure it's 
the correct field
+                    // - If names don't match, we treat the field as "not 
present" and use name mapping
+                    let field_by_id = 
field_id_to_source_schema_map.get(field_id).and_then(
+                        |(source_field, source_index)| {
+                            let name_matches = source_field.name() == 
&iceberg_field.name;
+
+                            if name_mapping.is_some() && !name_matches {

Review Comment:
   We don't this check? I think if we already found a field by id, then we 
should just use this column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to