This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new ab70ddc8d5e9 [SPARK-52267][SQL] Match field ID in 
ParquetToSparkSchemaConverter
ab70ddc8d5e9 is described below

commit ab70ddc8d5e971da7bb820dee9e8df1f813d9c0a
Author: Chenhao Li <chenhao...@databricks.com>
AuthorDate: Tue May 27 16:19:47 2025 +0800

    [SPARK-52267][SQL] Match field ID in ParquetToSparkSchemaConverter
    
    ### What changes were proposed in this pull request?
    
    In the vectorized Parquet reader, there are two classes to resolve the 
Parquet schema when reading a Parquet file:
    
    - `ParquetReadSupport`: it clips the Parquet schema to only include the 
necessary part used by the Spark requested schema. The matching considers both 
field name and ID.
    - `ParquetToSparkSchemaConverter`: it resolves the Parquet schema to a 
Spark type by connecting it to the Spark requested schema. The matching only 
considers field name.
    
    When the field ID matches but field name doesn't, the first step will clip 
the Parquet schema to the same structure as the Spark requested schema as 
expected. In the second step, the Parquet type cannot be connected to a Spark 
type in the requested schema, and it will be inferred as a Spark type. It will 
usually work as expected if the inferred type is the same as the requested 
type. But it is possible that they are different and the read is still valid. 
For example, if the Parquet ty [...]
    
    This can happen in real user cases if an Iceberg table with both rename and 
change column type (int -> long) operations is converted into a Delta table. 
This situation may be very rare, though.
    
    This PR fixes by bug by matching field ID in 
`ParquetToSparkSchemaConverter` when the name cannot be matched. I know that 
`ParquetReadSupport` gives priority to field ID when it exists, but I am not 
fully confident about this change and would like to keep the semantic change 
minimal.
    
    ### Why are the changes needed?
    
    It fixes a correctness issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, as stated above.
    
    ### How was this patch tested?
    
    Unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50990 from chenhao-db/ParquetToSparkSchemaConverter_fieldId.
    
    Authored-by: Chenhao Li <chenhao...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit cf6c26ada2ebc55aa7a65c2f175423017876fece)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../parquet/ParquetSchemaConverter.scala           | 28 ++++++++++++++++++---
 .../parquet/ParquetFieldIdIOSuite.scala            | 29 +++++++++++++++++++++-
 2 files changed, 53 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index daeb8e88a924..fcc8e76f73ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -57,21 +57,25 @@ class ParquetToSparkSchemaConverter(
     assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
     caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
     inferTimestampNTZ: Boolean = 
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get,
-    nanosAsLong: Boolean = 
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
+    nanosAsLong: Boolean = 
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get,
+    useFieldId: Boolean = 
SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,
     assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
     caseSensitive = conf.caseSensitiveAnalysis,
     inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
-    nanosAsLong = conf.legacyParquetNanosAsLong)
+    nanosAsLong = conf.legacyParquetNanosAsLong,
+    useFieldId = conf.parquetFieldIdReadEnabled)
 
   def this(conf: Configuration) = this(
     assumeBinaryIsString = 
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
     assumeInt96IsTimestamp = 
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
     caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
     inferTimestampNTZ = 
conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
-    nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
+    nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean,
+    useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get))
 
   /**
    * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].
@@ -107,11 +111,29 @@ class ParquetToSparkSchemaConverter(
     val schemaMapOpt = sparkReadSchema.map { schema =>
       schema.map(f => normalizeFieldName(f.name) -> f).toMap
     }
+    // Use ID mapping only when the name mapping doesn't find a match.
+    lazy val schemaIdMapOpt = sparkReadSchema match {
+      case Some(schema) if useFieldId =>
+        Some(schema.fields.flatMap { f =>
+          if (ParquetUtils.hasFieldId(f)) {
+            Some((ParquetUtils.getFieldId(f), f))
+          } else {
+            None
+          }
+        }.toMap)
+      case _ => None
+    }
 
     val converted = (0 until groupColumn.getChildrenCount).map { i =>
       val field = groupColumn.getChild(i)
       val fieldFromReadSchema = schemaMapOpt.flatMap { schemaMap =>
         schemaMap.get(normalizeFieldName(field.getName))
+      }.orElse {
+        val parquetFieldId = Option(field.getType.getId).map(_.intValue())
+        (parquetFieldId, schemaIdMapOpt) match {
+          case (Some(id), Some(map)) => map.get(id)
+          case _ => None
+        }
       }
       var fieldReadType = fieldFromReadSchema.map(_.dataType)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
index f5e854a79979..a33868fc82af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, 
MetadataBuilder, StringType, StructType}
+import org.apache.spark.sql.types._
 
 class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with 
SharedSparkSession  {
 
@@ -239,4 +239,31 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
       }
     }
   }
+
+  test("SPARK-52267: Field ID mapping when field name doesn't match") {
+    withTempDir { dir =>
+      val readSchema = new StructType().add("id1", LongType, true, withId(1))
+      val writeSchema = new StructType().add("id2", IntegerType, true, 
withId(1))
+
+      withSQLConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key -> "true") {
+        val writeData = Seq(Row(1), Row(2), Row(3))
+        spark.createDataFrame(writeData.asJava, writeSchema)
+          .write.mode("overwrite").parquet(dir.getCanonicalPath)
+      }
+
+      withAllParquetReaders {
+        withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "false") {
+          
checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
+            Seq(Row(null), Row(null), Row(null)))
+        }
+        // Without the fix, the result is unpredictable when 
PARQUET_FIELD_ID_READ_ENABLED is
+        // enabled. It could cause NPE if OnHeapColumnVector is used in the 
scan. It could produce
+        // incorrect results if OffHeapColumnVector is used.
+        withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") {
+          
checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
+            Seq(Row(1L), Row(2L), Row(3L)))
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to