This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 79c223950947 [SPARK-56931][SQL] Support complex constant metadata in
row materialization path
79c223950947 is described below
commit 79c223950947ed6c6f19c15fec9816d54c1bbc42
Author: Matt Zhang <[email protected]>
AuthorDate: Wed May 20 20:57:16 2026 +0800
[SPARK-56931][SQL] Support complex constant metadata in row materialization
path
### What changes were proposed in this pull request?
Follow-up to SPARK-56844, which allowed `ArrayType` / `MapType` /
`StructType`
in `FileSourceMetadataAttribute` and added the matching branches to
`ColumnVectorUtils.populate` for the columnar metadata path.
That covered file scans returning `ColumnarBatch`. For scans that produce
row-form output (text, JSON, CSV, or any reader with `Batched=false`), the
metadata row is filled via
`FileFormat.updateMetadataInternalRow` ->
`FileFormat.getFileConstantMetadataColumnValue` ->
`Literal(extractor.apply(file))`.
`Literal.apply(Any)` dispatches on the value class and has no case for
`ArrayData`, `MapData`, or `InternalRow`, so a complex constant metadata
column trips `UNSUPPORTED_FEATURE.LITERAL_TYPE` before the row is even
populated. Separately, `SchemaPruning.sortLeftFieldsByRight` recurses
through the metadata schema and prunes nested struct fields inside an
array/map/struct subfield. That is correct for data files (the reader
projects the requested columns) but wrong for constant metadata, where
each subfield's value is produced whole by a single extractor; pruning
shaves catalyst row positions out from under the extractor.
This PR:
- Threads the column's `DataType` through
`FileFormat.getFileConstantMetadataColumnValue` and
`updateMetadataInternalRow`. When provided, the value goes through
`Literal.create(value, dataType)` which accepts catalyst-form values
directly. The parameter is optional and existing call sites that pass
primitives keep working unchanged.
- Teaches `SchemaPruning.sortLeftFieldsByRight` to preserve subfield
data types when recursing inside a `FileSourceMetadataAttribute`. The
metadata attribute's top struct can still have unused sibling
sub-attributes pruned (each is a separate extractor), but anything
below that level is preserved verbatim. Non-metadata data file
pruning behavior is unchanged.
### Why are the changes needed?
Without this, a file format that registers a constant metadata column
with a complex type (e.g. `array<struct<...>>`) can be read columnar
but fails at runtime on the row path, and even on the columnar path
the schema-pruning rewriter can shift element struct ordinals.
### Does this PR introduce _any_ user-facing change?
No. No current OSS code path exposes a complex constant metadata column.
### How was this patch tested?
New `SchemaPruningSuite` case covers the metadata-attribute preservation
rule. Existing `SchemaPruningSuite` and `FileMetadataStructSuite` tests
verify the non-metadata and sibling-pruning behavior is unchanged.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (Anthropic)
Closes #55962 from mzhang/mzhang/SPARK-56931-constant-metadata-row-path.
Lead-authored-by: Matt Zhang <[email protected]>
Co-authored-by: Matt Zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/expressions/SchemaPruningSuite.scala | 1 +
.../sql/execution/datasources/FileFormat.scala | 35 ++++++++++---
.../sql/execution/datasources/FileScanRDD.scala | 8 +--
.../sql/execution/datasources/SchemaPruning.scala | 40 ++++++++++++++-
.../FileSourceCustomMetadataStructSuite.scala | 57 +++++++++++++++++++++-
5 files changed, 129 insertions(+), 12 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
index b64bc49f9544..a968526a89f1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala
@@ -143,4 +143,5 @@ class SchemaPruningSuite extends SparkFunSuite with
SQLHelper {
val prunedSchema = SchemaPruning.pruneSchema(schema, rootFields)
assert(prunedSchema.head.metadata.getString("foo") == "bar")
}
+
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 8a254b464da7..42ed6d782e34 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -231,6 +231,11 @@ trait FileFormat {
*
* NOTE: Extractors are lazy, invoked only if the query actually selects
their column at runtime.
*
+ * Return types: extractors may return either a raw value (which is
converted to the column's
+ * catalyst form via [[Literal.create]]) or an already-built [[Literal]]
(whose `.value` is
+ * used directly). For complex types ([[ArrayType]] / [[MapType]] /
[[StructType]]), return the
+ * value in catalyst form ([[ArrayData]] / [[MapData]] / [[InternalRow]]).
+ *
* See also [[FileFormat.getFileConstantMetadataColumnValue]].
*/
def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] =
@@ -273,6 +278,9 @@ object FileFormat {
FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType,
nullable = false),
FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME,
TimestampType, nullable = false))
+ private val BASE_METADATA_NAME_TO_TYPE: Map[String, DataType] =
+ BASE_METADATA_FIELDS.map(f => f.name -> f.dataType).toMap
+
/**
* All [[BASE_METADATA_FIELDS]] require custom extractors because they are
derived directly from
* fields of the [[PartitionedFile]], and do have entries in the file's
metadata map.
@@ -299,16 +307,26 @@ object FileFormat {
* If an extractor is available, apply it. Otherwise, look up the column's
name in the file's
* column value map and return the result (or null, if not found).
*
- * Raw values (including null) are automatically converted to literals as a
courtesy.
+ * Raw values (including null) are converted via [[Literal.create]], which
accepts catalyst-form
+ * values directly. This lets a complex constant metadata column return an
[[ArrayData]] /
+ * [[MapData]] / [[InternalRow]] whose element types only the caller knows.
If the extractor
+ * returns an already-built [[Literal]] (allowed by the extractor contract),
its value is
+ * unwrapped before delegating to [[Literal.create]] so the dataType
validation in the
+ * case-class constructor is checked against the raw value.
*/
def getFileConstantMetadataColumnValue(
name: String,
file: PartitionedFile,
- metadataExtractors: Map[String, PartitionedFile => Any]): Literal = {
+ metadataExtractors: Map[String, PartitionedFile => Any],
+ dataType: DataType): Literal = {
val extractor = metadataExtractors.getOrElse(name,
{ pf: PartitionedFile =>
pf.otherConstantMetadataColumnValues.get(name).orNull }
)
- Literal(extractor.apply(file))
+ val rawValue = extractor.apply(file) match {
+ case lit: Literal => lit.value
+ case other => other
+ }
+ Literal.create(rawValue, dataType)
}
// create an internal row given required metadata fields and file information
@@ -334,7 +352,9 @@ object FileFormat {
modificationTime = fileModificationTime,
fileSize = fileSize,
otherConstantMetadataColumnValues = Map.empty)
- updateMetadataInternalRow(new GenericInternalRow(fieldNames.length),
fieldNames, pf, extractors)
+ val fieldDataTypes = fieldNames.map(BASE_METADATA_NAME_TO_TYPE)
+ updateMetadataInternalRow(
+ new GenericInternalRow(fieldNames.length), fieldNames, pf, extractors,
fieldDataTypes)
}
// update an internal row given required metadata fields and file information
@@ -342,9 +362,12 @@ object FileFormat {
row: InternalRow,
fieldNames: Seq[String],
file: PartitionedFile,
- metadataExtractors: Map[String, PartitionedFile => Any]): InternalRow = {
+ metadataExtractors: Map[String, PartitionedFile => Any],
+ fieldDataTypes: Seq[DataType]): InternalRow = {
+ require(fieldDataTypes.length == fieldNames.length,
+ s"fieldDataTypes length ${fieldDataTypes.length} != fieldNames length
${fieldNames.length}")
fieldNames.zipWithIndex.foreach { case (name, i) =>
- getFileConstantMetadataColumnValue(name, file, metadataExtractors) match
{
+ getFileConstantMetadataColumnValue(name, file, metadataExtractors,
fieldDataTypes(i)) match {
case Literal(null, _) => row.setNullAt(i)
case literal => row.update(i, literal.value)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ac1e87de863e..b591573c00af 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -33,7 +33,6 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GenericInternalRow, JoinedRow, Literal, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
@@ -173,7 +172,8 @@ class FileScanRDD(
private def updateMetadataRow(): Unit =
if (metadataColumns.nonEmpty && currentFile != null) {
updateMetadataInternalRow(
- metadataRow, metadataColumns.map(_.name), currentFile,
metadataExtractors)
+ metadataRow, metadataColumns.map(_.name), currentFile,
metadataExtractors,
+ metadataColumns.map(_.dataType))
}
/**
@@ -183,11 +183,11 @@ class FileScanRDD(
val tmpRow = new GenericInternalRow(1)
metadataColumns.map { attr =>
// Populate each metadata column by passing the resulting value
through `tmpRow`.
- getFileConstantMetadataColumnValue(attr.name, currentFile,
metadataExtractors) match {
+ getFileConstantMetadataColumnValue(
+ attr.name, currentFile, metadataExtractors, attr.dataType) match
{
case Literal(null, _) =>
tmpRow.setNullAt(0)
case literal =>
- require(PhysicalDataType(attr.dataType) ==
PhysicalDataType(literal.dataType))
tmpRow.update(0, literal.value)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
index 1b23fd1a5e82..202599a819d8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
@@ -82,7 +82,7 @@ object SchemaPruning extends Rule[LogicalPlan] {
val metadataSchema =
relation.output.collect { case FileSourceMetadataAttribute(attr) =>
attr }.toStructType
val prunedMetadataSchema = if (metadataSchema.nonEmpty) {
- pruneSchema(metadataSchema, requestedRootFields)
+ pruneMetadataSchema(metadataSchema, requestedRootFields)
} else {
metadataSchema
}
@@ -114,6 +114,44 @@ object SchemaPruning extends Rule[LogicalPlan] {
fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] ||
fsRelation.fileFormat.isInstanceOf[OrcFileFormat])
+ /**
+ * Prunes a file-source metadata schema (one `StructType` containing each
+ * `FileSourceMetadataAttribute`). Unlike pruning a data file schema, this
only prunes
+ * unused sibling sub-attributes (each is its own per-field extractor); kept
sub-attributes'
+ * data types are preserved verbatim because the extractor produces a
complete catalyst
+ * value, and shaving fields out would shift positions in that value.
+ */
+ private def pruneMetadataSchema(
+ metadataSchema: StructType,
+ requestedRootFields: Seq[RootField]): StructType = {
+ val resolver = conf.resolver
+ StructType(metadataSchema.map { topField =>
+ topField.dataType match {
+ case innerStruct: StructType =>
+ // Collect the requested sub-attribute names for this metadata
attribute from the
+ // root field tree. Anything below those sub-attributes (e.g. nested
struct/array
+ // element fields) is ignored, since extractor outputs aren't pruned.
+ val requestedSubNames: Set[String] = requestedRootFields.collect {
+ case rf if resolver(rf.field.name, topField.name) =>
+ rf.field.dataType match {
+ case rs: StructType => rs.fieldNames.toSet
+ case _ => Set.empty[String]
+ }
+ }.flatten.toSet
+ val keptSubFields = innerStruct.fields.filter { sub =>
+ requestedSubNames.exists(name => resolver(name, sub.name))
+ }
+ if (keptSubFields.length == innerStruct.fields.length) {
+ // Nothing to prune for this attribute; keep the original.
+ topField
+ } else {
+ topField.copy(dataType = StructType(keptSubFields))
+ }
+ case _ => topField
+ }
+ })
+ }
+
/**
* Normalizes the names of the attribute references in the given expressions
to reflect
* the names in the given logical relation. This makes it possible to
compare attributes and
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
index 8f0a000f4517..1ace87deab47 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
@@ -24,11 +24,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression,
FileSourceConstantMetadataStructField, FileSourceGeneratedMetadataStructField,
Literal}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.classic.{DataFrame, Dataset}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType,
StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
/** Verifies the ability for a FileFormat to define custom metadata types */
@@ -336,6 +338,59 @@ class FileSourceCustomMetadataStructSuite extends
SharedSparkSession {
}
}
+ test("[SPARK-56931] complex constant metadata fields (array<struct>, struct)
on row path") {
+ withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
+ val permElement = StructType(Seq(
+ StructField("email", StringType),
+ StructField("role", StringType)))
+ val locationStruct = StructType(Seq(
+ StructField("country", StringType),
+ StructField("city", StringType)))
+ val complexFields = Seq(
+ FileSourceConstantMetadataStructField("perms", ArrayType(permElement,
containsNull = true)),
+ FileSourceConstantMetadataStructField("location", locationStruct))
+ val format = new TestFileFormat(complexFields)
+
+ // Build per-file values in catalyst form.
+ def perms(email: String, role: String): InternalRow =
+ InternalRow(UTF8String.fromString(email), UTF8String.fromString(role))
+ def loc(country: String, city: String): InternalRow =
+ InternalRow(UTF8String.fromString(country),
UTF8String.fromString(city))
+
+ val files = Seq(
+ FileStatusWithMetadata(f0, Map(
+ "perms" -> new GenericArrayData(Array[Any](perms("a@x", "r"),
perms("b@x", "w"))),
+ "location" -> loc("US", "SFO"))),
+ FileStatusWithMetadata(f1, Map(
+ "perms" -> new GenericArrayData(Array[Any](perms("c@x", "r"),
perms("d@x", "o"))),
+ "location" -> loc("CA", "YYZ"))))
+ val df = createDF(format, files)
+
+ // Force the row materialization path (Batched=false) so we exercise the
+ // updateMetadataInternalRow -> getFileConstantMetadataColumnValue ->
Literal.create
+ // change end-to-end. The query touches a subset of each subfield to
also exercise
+ // the metadata-schema pruning preservation rule.
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ // Query only the non-first sub-fields of each complex column. A buggy
implementation
+ // that pruned the kept sub-attribute's inner schema down to only the
queried fields
+ // would surface here: the extractor still produces `InternalRow("US",
"SFO")` /
+ // `InternalRow(email, role)`, and reading the kept field at the
pruned (now zero)
+ // ordinal would yield the index-0 value instead of the index-1 value.
+ checkAnswer(
+ df.selectExpr(
+ "fileNum",
+ "_metadata.perms[1].role AS second_role",
+ "_metadata.location.city AS city",
+ "size(_metadata.perms) AS perms_count"),
+ Seq(
+ Row(0, "w", "SFO", 2),
+ Row(0, "w", "SFO", 2),
+ Row(1, "o", "YYZ", 2),
+ Row(1, "o", "YYZ", 2)))
+ }
+ }
+ }
+
test("generated columns and extractors take precedence over metadata map
values") {
withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
import FileFormat.{FILE_NAME, FILE_SIZE}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]