This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new ab70ddc8d5e9 [SPARK-52267][SQL] Match field ID in ParquetToSparkSchemaConverter ab70ddc8d5e9 is described below commit ab70ddc8d5e971da7bb820dee9e8df1f813d9c0a Author: Chenhao Li <chenhao...@databricks.com> AuthorDate: Tue May 27 16:19:47 2025 +0800 [SPARK-52267][SQL] Match field ID in ParquetToSparkSchemaConverter ### What changes were proposed in this pull request? In the vectorized Parquet reader, there are two classes to resolve the Parquet schema when reading a Parquet file: - `ParquetReadSupport`: it clips the Parquet schema to only include the necessary part used by the Spark requested schema. The matching considers both field name and ID. - `ParquetToSparkSchemaConverter`: it resolves the Parquet schema to a Spark type by connecting it to the Spark requested schema. The matching only considers field name. When the field ID matches but field name doesn't, the first step will clip the Parquet schema to the same structure as the Spark requested schema as expected. In the second step, the Parquet type cannot be connected to a Spark type in the requested schema, and it will be inferred as a Spark type. It will usually work as expected if the inferred type is the same as the requested type. But it is possible that they are different and the read is still valid. For example, if the Parquet ty [...] This can happen in real user cases if an Iceberg table with both rename and change column type (int -> long) operations is converted into a Delta table. This situation may be very rare, though. This PR fixes by bug by matching field ID in `ParquetToSparkSchemaConverter` when the name cannot be matched. I know that `ParquetReadSupport` gives priority to field ID when it exists, but I am not fully confident about this change and would like to keep the semantic change minimal. ### Why are the changes needed? It fixes a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, as stated above. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50990 from chenhao-db/ParquetToSparkSchemaConverter_fieldId. Authored-by: Chenhao Li <chenhao...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit cf6c26ada2ebc55aa7a65c2f175423017876fece) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../parquet/ParquetSchemaConverter.scala | 28 ++++++++++++++++++--- .../parquet/ParquetFieldIdIOSuite.scala | 29 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index daeb8e88a924..fcc8e76f73ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -57,21 +57,25 @@ class ParquetToSparkSchemaConverter( assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get, inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get, - nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) { + nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get, + useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, caseSensitive = conf.caseSensitiveAnalysis, inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled, - nanosAsLong = conf.legacyParquetNanosAsLong) + nanosAsLong = conf.legacyParquetNanosAsLong, + useFieldId = conf.parquetFieldIdReadEnabled) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean, inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean, - nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean) + nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean, + useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get)) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -107,11 +111,29 @@ class ParquetToSparkSchemaConverter( val schemaMapOpt = sparkReadSchema.map { schema => schema.map(f => normalizeFieldName(f.name) -> f).toMap } + // Use ID mapping only when the name mapping doesn't find a match. + lazy val schemaIdMapOpt = sparkReadSchema match { + case Some(schema) if useFieldId => + Some(schema.fields.flatMap { f => + if (ParquetUtils.hasFieldId(f)) { + Some((ParquetUtils.getFieldId(f), f)) + } else { + None + } + }.toMap) + case _ => None + } val converted = (0 until groupColumn.getChildrenCount).map { i => val field = groupColumn.getChild(i) val fieldFromReadSchema = schemaMapOpt.flatMap { schemaMap => schemaMap.get(normalizeFieldName(field.getName)) + }.orElse { + val parquetFieldId = Option(field.getType.getId).map(_.intValue()) + (parquetFieldId, schemaIdMapOpt) match { + case (Some(id), Some(map)) => map.get(id) + case _ => None + } } var fieldReadType = fieldFromReadSchema.map(_.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala index f5e854a79979..a33868fc82af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.types._ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession { @@ -239,4 +239,31 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS } } } + + test("SPARK-52267: Field ID mapping when field name doesn't match") { + withTempDir { dir => + val readSchema = new StructType().add("id1", LongType, true, withId(1)) + val writeSchema = new StructType().add("id2", IntegerType, true, withId(1)) + + withSQLConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key -> "true") { + val writeData = Seq(Row(1), Row(2), Row(3)) + spark.createDataFrame(writeData.asJava, writeSchema) + .write.mode("overwrite").parquet(dir.getCanonicalPath) + } + + withAllParquetReaders { + withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "false") { + checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), + Seq(Row(null), Row(null), Row(null))) + } + // Without the fix, the result is unpredictable when PARQUET_FIELD_ID_READ_ENABLED is + // enabled. It could cause NPE if OnHeapColumnVector is used in the scan. It could produce + // incorrect results if OffHeapColumnVector is used. + withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), + Seq(Row(1L), Row(2L), Row(3L))) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org