mbutrovich commented on code in PR #1821:
URL: https://github.com/apache/iceberg-rust/pull/1821#discussion_r2499713931
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -270,88 +371,233 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+ constants_map: HashMap<i32, PrimitiveLiteral>,
+ _partition_spec: Option<&PartitionSpec>,
+ name_mapping: Option<&NameMapping>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
- projected_iceberg_field_ids.iter().map(|field_id|{
- let (target_field, _) =
field_id_to_mapped_schema_map.get(field_id).ok_or(
- Error::new(ErrorKind::Unexpected, "could not find field in
schema")
- )?;
- let target_type = target_field.data_type();
+ // Build name-based map for spec rule #2 (name mapping)
+ // This allows us to find Parquet columns by name when field IDs are
missing/conflicting
+ let field_name_to_source_schema_map =
+ Self::build_field_name_to_arrow_schema_map(source_schema);
- Ok(if let Some((source_field, source_index)) =
field_id_to_source_schema_map.get(field_id) {
- // column present in source
+ projected_iceberg_field_ids
+ .iter()
+ .map(|field_id| {
+ let (target_field, _) =
+ field_id_to_mapped_schema_map
+ .get(field_id)
+ .ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "could not find field in schema",
+ ))?;
+ let target_type = target_field.data_type();
- if source_field.data_type().equals_datatype(target_type) {
- // no promotion required
- ColumnSource::PassThrough {
- source_index: *source_index
- }
- } else {
- // promotion required
- ColumnSource::Promote {
+ let iceberg_field =
snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "Field not found in snapshot schema",
+ ))?;
+
+ // Determine how to source this column per Iceberg spec
"Column Projection" rules.
+ //
+ // Per the spec
(https://iceberg.apache.org/spec/#column-projection):
+ // "Values for field ids which are not present in a data file
must be resolved
+ // according the following rules:"
+ //
+ // 1. "Return the value from partition metadata if an Identity
Transform exists
+ // for the field and the partition value is present in the
partition struct
+ // on data_file object in the manifest."
+ // 2. "Use schema.name-mapping.default metadata to map field
id to columns
+ // without field id as described below and use the column
if it is present."
+ // 3. "Return the default value if it has a defined
initial-default"
+ // 4. "Return null in all other cases"
+
+ let column_source = if let Some(constant_value) =
constants_map.get(field_id) {
+ // Spec rule #1: Identity-partitioned column - use
constant from partition metadata
+ ColumnSource::Add {
+ value: Some(constant_value.clone()),
target_type: target_type.clone(),
- source_index: *source_index,
}
- }
- } else {
- // column must be added
- let iceberg_field =
snapshot_schema.field_by_id(*field_id).ok_or(
- Error::new(ErrorKind::Unexpected, "Field not found in
snapshot schema")
- )?;
-
- let default_value = if let Some(iceberg_default_value) =
- &iceberg_field.initial_default
- {
- let Literal::Primitive(primitive_literal) =
iceberg_default_value else {
- return Err(Error::new(
- ErrorKind::Unexpected,
- format!("Default value for column must be
primitive type, but encountered {iceberg_default_value:?}")
- ));
- };
- Some(primitive_literal.clone())
} else {
- None
+ // Check if field ID exists in Parquet, but verify it's
the CORRECT field.
+ //
+ // Per the Iceberg spec
(https://iceberg.apache.org/spec/#column-projection):
+ // "Values for field ids which are NOT PRESENT in a data
file must be resolved
+ // according the following rules..."
+ //
+ // The key insight: In add_files scenarios (Hive table
migrations), Parquet files
+ // may have field IDs that conflict with the Iceberg
schema's field IDs:
+ //
+ // Example:
+ // - Parquet file written with: field_id=1→"name",
field_id=2→"dept"
+ // - Imported via add_files with: field_id=1→"id"
(partition), field_id=2→"name", field_id=3→"dept"
+ //
+ // When we look for Iceberg field_id=1 ("id"), we find a
field_id=1 in the Parquet file,
+ // but it's the WRONG field (it's "name", not "id"). The
correct field (id=1, name="id")
+ // is NOT PRESENT in the Parquet file - it only exists as
partition metadata.
+ //
+ // Per the spec: when a field is "not present", we should
apply rules 2-4 (name mapping,
+ // initial-default, or null).
+ //
+ // Java's approach (ParquetSchemaUtil.applyNameMapping,
ReadConf.java lines 83-85):
+ // Java REWRITES the Parquet schema's field IDs based on
names BEFORE projection:
+ // ```java
+ // if (nameMapping != null) {
+ // typeWithIds =
ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping);
+ // // Now field IDs match Iceberg schema based on name
mapping
+ // this.projection =
ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
+ // }
+ // ```
+ //
+ // Our approach achieves the same result but detects
conflicts DURING projection:
+ // - When name mapping is present, field ID matches alone
aren't sufficient
+ // - We verify the field NAME also matches to ensure it's
the correct field
+ // - If names don't match, we treat the field as "not
present" and use name mapping
+ let field_by_id =
field_id_to_source_schema_map.get(field_id).and_then(
+ |(source_field, source_index)| {
+ let name_matches = source_field.name() ==
&iceberg_field.name;
+
+ if name_mapping.is_some() && !name_matches {
Review Comment:
See response below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]