This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 98999004d9f3 [SPARK-56931][SQL] Support complex constant metadata in 
row materialization path
98999004d9f3 is described below

commit 98999004d9f363618035fd4634c15463bc192eca
Author: Matt Zhang <[email protected]>
AuthorDate: Wed May 20 20:57:16 2026 +0800

    [SPARK-56931][SQL] Support complex constant metadata in row materialization 
path
    
    ### What changes were proposed in this pull request?
    
    Follow-up to SPARK-56844, which allowed `ArrayType` / `MapType` / 
`StructType`
    in `FileSourceMetadataAttribute` and added the matching branches to
    `ColumnVectorUtils.populate` for the columnar metadata path.
    
    That covered file scans returning `ColumnarBatch`. For scans that produce
    row-form output (text, JSON, CSV, or any reader with `Batched=false`), the
    metadata row is filled via
    `FileFormat.updateMetadataInternalRow` ->
    `FileFormat.getFileConstantMetadataColumnValue` ->
    `Literal(extractor.apply(file))`.
    
    `Literal.apply(Any)` dispatches on the value class and has no case for
    `ArrayData`, `MapData`, or `InternalRow`, so a complex constant metadata
    column trips `UNSUPPORTED_FEATURE.LITERAL_TYPE` before the row is even
    populated. Separately, `SchemaPruning.sortLeftFieldsByRight` recurses
    through the metadata schema and prunes nested struct fields inside an
    array/map/struct subfield. That is correct for data files (the reader
    projects the requested columns) but wrong for constant metadata, where
    each subfield's value is produced whole by a single extractor; pruning
    shaves catalyst row positions out from under the extractor.
    
    This PR:
    - Threads the column's `DataType` through
      `FileFormat.getFileConstantMetadataColumnValue` and
      `updateMetadataInternalRow`. When provided, the value goes through
      `Literal.create(value, dataType)` which accepts catalyst-form values
      directly. The parameter is optional and existing call sites that pass
      primitives keep working unchanged.
    - Teaches `SchemaPruning.sortLeftFieldsByRight` to preserve subfield
      data types when recursing inside a `FileSourceMetadataAttribute`. The
      metadata attribute's top struct can still have unused sibling
      sub-attributes pruned (each is a separate extractor), but anything
      below that level is preserved verbatim. Non-metadata data file
      pruning behavior is unchanged.
    
    ### Why are the changes needed?
    
    Without this, a file format that registers a constant metadata column
    with a complex type (e.g. `array<struct<...>>`) can be read columnar
    but fails at runtime on the row path, and even on the columnar path
    the schema-pruning rewriter can shift element struct ordinals.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. No current OSS code path exposes a complex constant metadata column.
    
    ### How was this patch tested?
    
    New `SchemaPruningSuite` case covers the metadata-attribute preservation
    rule. Existing `SchemaPruningSuite` and `FileMetadataStructSuite` tests
    verify the non-metadata and sibling-pruning behavior is unchanged.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude (Anthropic)
    
    Closes #55962 from mzhang/mzhang/SPARK-56931-constant-metadata-row-path.
    
    Lead-authored-by: Matt Zhang <[email protected]>
    Co-authored-by: Matt Zhang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 79c223950947ed6c6f19c15fec9816d54c1bbc42)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalyst/expressions/SchemaPruningSuite.scala  |  1 +
 .../sql/execution/datasources/FileFormat.scala     | 35 ++++++++++---
 .../sql/execution/datasources/FileScanRDD.scala    |  8 +--
 .../sql/execution/datasources/SchemaPruning.scala  | 40 ++++++++++++++-
 .../FileSourceCustomMetadataStructSuite.scala      | 57 +++++++++++++++++++++-
 5 files changed, 129 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
index b64bc49f9544..a968526a89f1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
@@ -143,4 +143,5 @@ class SchemaPruningSuite extends SparkFunSuite with 
SQLHelper {
     val prunedSchema = SchemaPruning.pruneSchema(schema, rootFields)
     assert(prunedSchema.head.metadata.getString("foo") == "bar")
   }
+
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 8a254b464da7..42ed6d782e34 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -231,6 +231,11 @@ trait FileFormat {
    *
    * NOTE: Extractors are lazy, invoked only if the query actually selects 
their column at runtime.
    *
+   * Return types: extractors may return either a raw value (which is 
converted to the column's
+   * catalyst form via [[Literal.create]]) or an already-built [[Literal]] 
(whose `.value` is
+   * used directly). For complex types ([[ArrayType]] / [[MapType]] / 
[[StructType]]), return the
+   * value in catalyst form ([[ArrayData]] / [[MapData]] / [[InternalRow]]).
+   *
    * See also [[FileFormat.getFileConstantMetadataColumnValue]].
    */
   def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] =
@@ -273,6 +278,9 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, 
nullable = false),
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, 
TimestampType, nullable = false))
 
+  private val BASE_METADATA_NAME_TO_TYPE: Map[String, DataType] =
+    BASE_METADATA_FIELDS.map(f => f.name -> f.dataType).toMap
+
   /**
    * All [[BASE_METADATA_FIELDS]] require custom extractors because they are 
derived directly from
    * fields of the [[PartitionedFile]], and do have entries in the file's 
metadata map.
@@ -299,16 +307,26 @@ object FileFormat {
    * If an extractor is available, apply it. Otherwise, look up the column's 
name in the file's
    * column value map and return the result (or null, if not found).
    *
-   * Raw values (including null) are automatically converted to literals as a 
courtesy.
+   * Raw values (including null) are converted via [[Literal.create]], which 
accepts catalyst-form
+   * values directly. This lets a complex constant metadata column return an 
[[ArrayData]] /
+   * [[MapData]] / [[InternalRow]] whose element types only the caller knows. 
If the extractor
+   * returns an already-built [[Literal]] (allowed by the extractor contract), 
its value is
+   * unwrapped before delegating to [[Literal.create]] so the dataType 
validation in the
+   * case-class constructor is checked against the raw value.
    */
   def getFileConstantMetadataColumnValue(
       name: String,
       file: PartitionedFile,
-      metadataExtractors: Map[String, PartitionedFile => Any]): Literal = {
+      metadataExtractors: Map[String, PartitionedFile => Any],
+      dataType: DataType): Literal = {
     val extractor = metadataExtractors.getOrElse(name,
       { pf: PartitionedFile => 
pf.otherConstantMetadataColumnValues.get(name).orNull }
     )
-    Literal(extractor.apply(file))
+    val rawValue = extractor.apply(file) match {
+      case lit: Literal => lit.value
+      case other => other
+    }
+    Literal.create(rawValue, dataType)
   }
 
   // create an internal row given required metadata fields and file information
@@ -334,7 +352,9 @@ object FileFormat {
       modificationTime = fileModificationTime,
       fileSize = fileSize,
       otherConstantMetadataColumnValues = Map.empty)
-    updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), 
fieldNames, pf, extractors)
+    val fieldDataTypes = fieldNames.map(BASE_METADATA_NAME_TO_TYPE)
+    updateMetadataInternalRow(
+      new GenericInternalRow(fieldNames.length), fieldNames, pf, extractors, 
fieldDataTypes)
   }
 
   // update an internal row given required metadata fields and file information
@@ -342,9 +362,12 @@ object FileFormat {
       row: InternalRow,
       fieldNames: Seq[String],
       file: PartitionedFile,
-      metadataExtractors: Map[String, PartitionedFile => Any]): InternalRow = {
+      metadataExtractors: Map[String, PartitionedFile => Any],
+      fieldDataTypes: Seq[DataType]): InternalRow = {
+    require(fieldDataTypes.length == fieldNames.length,
+      s"fieldDataTypes length ${fieldDataTypes.length} != fieldNames length 
${fieldNames.length}")
     fieldNames.zipWithIndex.foreach { case (name, i) =>
-      getFileConstantMetadataColumnValue(name, file, metadataExtractors) match 
{
+      getFileConstantMetadataColumnValue(name, file, metadataExtractors, 
fieldDataTypes(i)) match {
         case Literal(null, _) => row.setNullAt(i)
         case literal => row.update(i, literal.value)
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ac1e87de863e..b591573c00af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -33,7 +33,6 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GenericInternalRow, JoinedRow, Literal, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.FileFormat._
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
@@ -173,7 +172,8 @@ class FileScanRDD(
       private def updateMetadataRow(): Unit =
         if (metadataColumns.nonEmpty && currentFile != null) {
           updateMetadataInternalRow(
-            metadataRow, metadataColumns.map(_.name), currentFile, 
metadataExtractors)
+            metadataRow, metadataColumns.map(_.name), currentFile, 
metadataExtractors,
+            metadataColumns.map(_.dataType))
         }
 
       /**
@@ -183,11 +183,11 @@ class FileScanRDD(
         val tmpRow = new GenericInternalRow(1)
         metadataColumns.map { attr =>
           // Populate each metadata column by passing the resulting value 
through `tmpRow`.
-          getFileConstantMetadataColumnValue(attr.name, currentFile, 
metadataExtractors) match {
+          getFileConstantMetadataColumnValue(
+              attr.name, currentFile, metadataExtractors, attr.dataType) match 
{
             case Literal(null, _) =>
               tmpRow.setNullAt(0)
             case literal =>
-              require(PhysicalDataType(attr.dataType) == 
PhysicalDataType(literal.dataType))
               tmpRow.update(0, literal.value)
           }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
index 1b23fd1a5e82..202599a819d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
@@ -82,7 +82,7 @@ object SchemaPruning extends Rule[LogicalPlan] {
       val metadataSchema =
         relation.output.collect { case FileSourceMetadataAttribute(attr) => 
attr }.toStructType
       val prunedMetadataSchema = if (metadataSchema.nonEmpty) {
-        pruneSchema(metadataSchema, requestedRootFields)
+        pruneMetadataSchema(metadataSchema, requestedRootFields)
       } else {
         metadataSchema
       }
@@ -114,6 +114,44 @@ object SchemaPruning extends Rule[LogicalPlan] {
       fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] ||
         fsRelation.fileFormat.isInstanceOf[OrcFileFormat])
 
+  /**
+   * Prunes a file-source metadata schema (one `StructType` containing each
+   * `FileSourceMetadataAttribute`). Unlike pruning a data file schema, this 
only prunes
+   * unused sibling sub-attributes (each is its own per-field extractor); kept 
sub-attributes'
+   * data types are preserved verbatim because the extractor produces a 
complete catalyst
+   * value, and shaving fields out would shift positions in that value.
+   */
+  private def pruneMetadataSchema(
+      metadataSchema: StructType,
+      requestedRootFields: Seq[RootField]): StructType = {
+    val resolver = conf.resolver
+    StructType(metadataSchema.map { topField =>
+      topField.dataType match {
+        case innerStruct: StructType =>
+          // Collect the requested sub-attribute names for this metadata 
attribute from the
+          // root field tree. Anything below those sub-attributes (e.g. nested 
struct/array
+          // element fields) is ignored, since extractor outputs aren't pruned.
+          val requestedSubNames: Set[String] = requestedRootFields.collect {
+            case rf if resolver(rf.field.name, topField.name) =>
+              rf.field.dataType match {
+                case rs: StructType => rs.fieldNames.toSet
+                case _ => Set.empty[String]
+              }
+          }.flatten.toSet
+          val keptSubFields = innerStruct.fields.filter { sub =>
+            requestedSubNames.exists(name => resolver(name, sub.name))
+          }
+          if (keptSubFields.length == innerStruct.fields.length) {
+            // Nothing to prune for this attribute; keep the original.
+            topField
+          } else {
+            topField.copy(dataType = StructType(keptSubFields))
+          }
+        case _ => topField
+      }
+    })
+  }
+
   /**
    * Normalizes the names of the attribute references in the given expressions 
to reflect
    * the names in the given logical relation. This makes it possible to 
compare attributes and
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
index 8f0a000f4517..1ace87deab47 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
@@ -24,11 +24,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
FileSourceConstantMetadataStructField, FileSourceGeneratedMetadataStructField, 
Literal}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
 import org.apache.spark.sql.classic.{DataFrame, Dataset}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, 
StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 /** Verifies the ability for a FileFormat to define custom metadata types */
@@ -336,6 +338,59 @@ class FileSourceCustomMetadataStructSuite extends 
SharedSparkSession {
     }
   }
 
+  test("[SPARK-56931] complex constant metadata fields (array<struct>, struct) 
on row path") {
+    withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
+      val permElement = StructType(Seq(
+        StructField("email", StringType),
+        StructField("role", StringType)))
+      val locationStruct = StructType(Seq(
+        StructField("country", StringType),
+        StructField("city", StringType)))
+      val complexFields = Seq(
+        FileSourceConstantMetadataStructField("perms", ArrayType(permElement, 
containsNull = true)),
+        FileSourceConstantMetadataStructField("location", locationStruct))
+      val format = new TestFileFormat(complexFields)
+
+      // Build per-file values in catalyst form.
+      def perms(email: String, role: String): InternalRow =
+        InternalRow(UTF8String.fromString(email), UTF8String.fromString(role))
+      def loc(country: String, city: String): InternalRow =
+        InternalRow(UTF8String.fromString(country), 
UTF8String.fromString(city))
+
+      val files = Seq(
+        FileStatusWithMetadata(f0, Map(
+          "perms" -> new GenericArrayData(Array[Any](perms("a@x", "r"), 
perms("b@x", "w"))),
+          "location" -> loc("US", "SFO"))),
+        FileStatusWithMetadata(f1, Map(
+          "perms" -> new GenericArrayData(Array[Any](perms("c@x", "r"), 
perms("d@x", "o"))),
+          "location" -> loc("CA", "YYZ"))))
+      val df = createDF(format, files)
+
+      // Force the row materialization path (Batched=false) so we exercise the
+      // updateMetadataInternalRow -> getFileConstantMetadataColumnValue -> 
Literal.create
+      // change end-to-end. The query touches a subset of each subfield to 
also exercise
+      // the metadata-schema pruning preservation rule.
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+        // Query only the non-first sub-fields of each complex column. A buggy 
implementation
+        // that pruned the kept sub-attribute's inner schema down to only the 
queried fields
+        // would surface here: the extractor still produces `InternalRow("US", 
"SFO")` /
+        // `InternalRow(email, role)`, and reading the kept field at the 
pruned (now zero)
+        // ordinal would yield the index-0 value instead of the index-1 value.
+        checkAnswer(
+          df.selectExpr(
+            "fileNum",
+            "_metadata.perms[1].role AS second_role",
+            "_metadata.location.city AS city",
+            "size(_metadata.perms) AS perms_count"),
+          Seq(
+            Row(0, "w", "SFO", 2),
+            Row(0, "w", "SFO", 2),
+            Row(1, "o", "YYZ", 2),
+            Row(1, "o", "YYZ", 2)))
+      }
+    }
+  }
+
   test("generated columns and extractors take precedence over metadata map 
values") {
     withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
       import FileFormat.{FILE_NAME, FILE_SIZE}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to