This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7426d82c59b2 [SPARK-54686][SQL] Relax DSv2 table checks in temp views 
to allow new top-level columns
7426d82c59b2 is described below

commit 7426d82c59b2feddf7c1da507f2f340c4cc5a514
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Dec 15 15:33:30 2025 -0800

    [SPARK-54686][SQL] Relax DSv2 table checks in temp views to allow new 
top-level columns
    
    ### What changes were proposed in this pull request?
    
    This PR relaxes DSv2 table checks in temp views to allow new top-level 
columns.
    
    ### Why are the changes needed?
    
    These changes are needed to avoid regressions introduced in upcoming 4.1. 
The newly added table checks are too strict. We should follow the SQL view 
behavior and allow new top-level columns.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes but for unreleased functionality.
    
    ### How was this patch tested?
    
    Existing + new tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53450 from aokolnychyi/spark-54686.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit 2a28bb01ae16d6164733ee741a3116c0f6d22827)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../sql/catalyst/analysis/V2TableReference.scala   |  6 +-
 .../org/apache/spark/sql/util/SchemaUtils.scala    |  5 +-
 .../sql/connector/catalog/V2TableUtilSuite.scala   | 90 +++++++++++++++++++++-
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 81 ++++++++++---------
 4 files changed, 139 insertions(+), 43 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
index b6a2c6db6604..85c36d452b30 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.V2TableUtil
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import 
org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_TOP_LEVEL_FIELDS
 import org.apache.spark.util.ArrayImplicits._
 
 /**
@@ -120,7 +121,10 @@ private[sql] object V2TableReferenceUtils extends 
SQLConfHelper {
       ctx: TemporaryViewContext): Unit = {
     val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
 
-    val dataErrors = V2TableUtil.validateCapturedColumns(table, 
ref.info.columns)
+    val dataErrors = V2TableUtil.validateCapturedColumns(
+      table,
+      ref.info.columns,
+      mode = ALLOW_NEW_TOP_LEVEL_FIELDS)
     if (dataErrors.nonEmpty) {
       throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation(
         ctx.viewName,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
index 76c4d518df6f..58ababa04739 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdenti
 import org.apache.spark.sql.connector.expressions.{BucketTransform, 
FieldReference, NamedTransform, Transform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
+import 
org.apache.spark.sql.util.SchemaValidationMode.{ALLOW_NEW_TOP_LEVEL_FIELDS, 
PROHIBIT_CHANGES}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.SparkSchemaUtils
 
@@ -459,7 +459,7 @@ private[spark] object SchemaUtils {
           }
         }
 
-        if (mode == PROHIBIT_CHANGES) {
+        if (mode == PROHIBIT_CHANGES || (mode == ALLOW_NEW_TOP_LEVEL_FIELDS && 
colPath.nonEmpty)) {
           otherFieldsByName.foreach { case (normalizedName, otherField) =>
             if (!fieldsByName.contains(normalizedName)) {
               errors += s"${formatField(colPath, otherField)} has been added"
@@ -529,4 +529,5 @@ private[spark] sealed trait SchemaValidationMode
 private[spark] object SchemaValidationMode {
   case object PROHIBIT_CHANGES extends SchemaValidationMode
   case object ALLOW_NEW_FIELDS extends SchemaValidationMode
+  case object ALLOW_NEW_TOP_LEVEL_FIELDS extends SchemaValidationMode
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
index b4923b116937..a9e5668d7fef 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
@@ -26,7 +26,8 @@ import 
org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
+import 
org.apache.spark.sql.util.SchemaValidationMode.{ALLOW_NEW_TOP_LEVEL_FIELDS, 
PROHIBIT_CHANGES}
+import org.apache.spark.sql.util.SchemaValidationMode
 import org.apache.spark.util.ArrayImplicits.SparkArrayOps
 
 class V2TableUtilSuite extends SparkFunSuite {
@@ -529,6 +530,86 @@ class V2TableUtilSuite extends SparkFunSuite {
     assert(errors.head == "`person`.`attrs`.`value` type has changed from INT 
to BIGINT")
   }
 
+  test("validateCapturedColumns - ALLOW_NEW_TOP_LEVEL_FIELDS allows top-level 
additions") {
+    val originCols = Array(
+      col("id", LongType, nullable = false),
+      col("name", StringType, nullable = true))
+    val currentCols = Array(
+      col("id", LongType, nullable = false),
+      col("name", StringType, nullable = true),
+      col("age", IntegerType, nullable = true))
+    val table = TestTableWithMetadataSupport("test", currentCols)
+
+    val errors = validateCapturedColumns(table, originCols, 
ALLOW_NEW_TOP_LEVEL_FIELDS)
+    assert(errors.isEmpty)
+  }
+
+  test("validateCapturedColumns - ALLOW_NEW_TOP_LEVEL_FIELDS prohibits nested 
additions") {
+    val originAddress = StructType(Seq(
+      StructField("street", StringType),
+      StructField("city", StringType)))
+    val originCols = Array(
+      col("id", LongType, nullable = false),
+      col("address", originAddress, nullable = true))
+    val currentAddress = StructType(Seq(
+      StructField("street", StringType),
+      StructField("city", StringType),
+      StructField("zipCode", StringType)))
+    val currentCols = Array(
+      col("id", LongType, nullable = false),
+      col("address", currentAddress, nullable = true))
+    val table = TestTableWithMetadataSupport("test", currentCols)
+
+    val errors = validateCapturedColumns(table, originCols, 
ALLOW_NEW_TOP_LEVEL_FIELDS)
+    assert(errors.size == 1)
+    assert(errors.head.contains("`address`.`zipCode` STRING has been added"))
+  }
+
+  test("validateCapturedColumns - ALLOW_NEW_TOP_LEVEL_FIELDS fails new nested 
fields in array") {
+    val originItem = StructType(Seq(
+      StructField("itemId", LongType),
+      StructField("itemName", StringType)))
+    val originCols = Array(
+      col("id", LongType, nullable = false),
+      col("items", ArrayType(originItem), nullable = true))
+    val currentItem = StructType(Seq(
+      StructField("itemId", LongType),
+      StructField("itemName", StringType),
+      StructField("price", IntegerType)))
+    val currentCols = Array(
+      col("id", LongType, nullable = false),
+      col("items", ArrayType(currentItem), nullable = true))
+    val table = TestTableWithMetadataSupport("test", currentCols)
+
+    val errors = V2TableUtil.validateCapturedColumns(
+      table,
+      originCols.toImmutableArraySeq,
+      mode = ALLOW_NEW_TOP_LEVEL_FIELDS)
+    assert(errors.size == 1)
+    assert(errors.head.contains("`items`.`element`.`price` INT has been 
added"))
+  }
+
+  test("validateCapturedColumns - ALLOW_NEW_TOP_LEVEL_FIELDS prohibits nested 
map additions") {
+    val originValue = StructType(Seq(
+      StructField("count", IntegerType),
+      StructField("status", StringType)))
+    val originCols = Array(
+      col("id", LongType, nullable = false),
+      col("metadata", MapType(StringType, originValue), nullable = true))
+    val currentValue = StructType(Seq(
+      StructField("count", IntegerType),
+      StructField("status", StringType),
+      StructField("timestamp", LongType)))
+    val currentCols = Array(
+      col("id", LongType, nullable = false),
+      col("metadata", MapType(StringType, currentValue), nullable = true))
+    val table = TestTableWithMetadataSupport("test", currentCols)
+
+    val errors = validateCapturedColumns(table, originCols, 
ALLOW_NEW_TOP_LEVEL_FIELDS)
+    assert(errors.size == 1)
+    assert(errors.head.contains("`metadata`.`value`.`timestamp` BIGINT has 
been added"))
+  }
+
   // simple table without metadata column support
   private case class TestTable(
       override val name: String,
@@ -555,8 +636,11 @@ class V2TableUtilSuite extends SparkFunSuite {
     override def metadataInJSON: String = "{}"
   }
 
-  private def validateCapturedColumns(table: Table, originCols: 
Array[Column]): Seq[String] = {
-    V2TableUtil.validateCapturedColumns(table, originCols.toImmutableArraySeq)
+  private def validateCapturedColumns(
+      table: Table,
+      originCols: Array[Column],
+      mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = {
+    V2TableUtil.validateCapturedColumns(table, originCols.toImmutableArraySeq, 
mode)
   }
 
   private def col(name: String, dataType: DataType, nullable: Boolean): Column 
= {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index cdb498c8f2bf..c3164b3428f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -1254,7 +1254,7 @@ class DataSourceV2DataFrameSuite
     }
   }
 
-  test("SPARK-54157: cached temp view detects schema changes after analysis") {
+  test("SPARK-54157: cached temp view allows top-level column additions") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
       sql(s"CREATE TABLE $t (id INT, data STRING) USING foo")
@@ -1268,15 +1268,9 @@ class DataSourceV2DataFrameSuite
       // change table schema after the view has been analyzed and cached
       sql(s"ALTER TABLE $t ADD COLUMN extra INT")
 
-      // execution should fail with column mismatch even though the view is 
cached
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `extra` INT has been added"))
+      // execution should succeed as top-level column additions are allowed
+      // the temp view captures the original columns just like SQL views
+      checkAnswer(spark.table("v"), Seq(Row(1, "a")))
     }
   }
 
@@ -1512,7 +1506,7 @@ class DataSourceV2DataFrameSuite
     }
   }
 
-  test("SPARK-53924: temp view on DSv2 table detects added columns") {
+  test("SPARK-53924: temp view on DSv2 table allows top-level column 
additions") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
       sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
@@ -1521,10 +1515,32 @@ class DataSourceV2DataFrameSuite
       spark.table(t).createOrReplaceTempView("v")
       checkAnswer(spark.table("v"), Seq.empty)
 
-      // add column to underlying table
+      // add top-level column to underlying table
       sql(s"ALTER TABLE $t ADD COLUMN age int")
 
-      // accessing temp view should detect schema change
+      // accessing temp view should succeed as top-level column additions are 
allowed
+      // view captures original columns
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert data to verify view still works correctly
+      sql(s"INSERT INTO $t VALUES (1, 'a', 25)")
+      checkAnswer(spark.table("v"), Seq(Row(1, "a")))
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table detects nested column additions") 
{
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, address STRUCT<street: STRING, city: 
STRING>) USING foo")
+
+      // create temp view using DataFrame API
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // add nested column to underlying table
+      sql(s"ALTER TABLE $t ADD COLUMN address.zipCode string")
+
+      // accessing temp view should detect schema change for nested additions
       checkError(
         exception = intercept[AnalysisException] { spark.table("v").collect() 
},
         condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
@@ -1532,7 +1548,7 @@ class DataSourceV2DataFrameSuite
           "viewName" -> "`v`",
           "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
           "colType" -> "data",
-          "errors" -> "- `age` INT has been added"))
+          "errors" -> "- `address`.`zipCode` STRING has been added"))
     }
   }
 
@@ -1620,13 +1636,13 @@ class DataSourceV2DataFrameSuite
   test("SPARK-53924: createOrReplaceTempView works after schema change") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
-      sql(s"CREATE TABLE $t (id bigint) USING foo")
+      sql(s"CREATE TABLE $t (id bigint, data STRING, extra INT) USING foo")
 
       spark.table(t).createOrReplaceTempView("v")
       checkAnswer(spark.table("v"), Seq.empty)
 
       // alter table
-      sql(s"ALTER TABLE $t ADD COLUMN data string")
+      sql(s"ALTER TABLE $t DROP COLUMN extra")
 
       // old view fails
       intercept[AnalysisException] { spark.table("v").collect() }
@@ -1658,22 +1674,16 @@ class DataSourceV2DataFrameSuite
       }.get
       assert(options.get("fakeOption") == "testValue")
 
-      // schema changes should still be detected
+      // add top-level column to underlying table
       sql(s"ALTER TABLE $t ADD COLUMN age int")
 
-      // accessing temp view should detect schema change
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `age` INT has been added"))
+      // accessing temp view should succeed as top-level column additions are 
allowed
+
+      checkAnswer(spark.table("v"), Seq.empty)
     }
   }
 
-  test("SPARK-53924: temp view on DSv2 table created using SQL with plan 
detects changes") {
+  test("SPARK-53924: temp view on DSv2 table created using SQL with plan and 
top-level additions") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
       withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
@@ -1687,18 +1697,15 @@ class DataSourceV2DataFrameSuite
         val Some(view) = spark.sessionState.catalog.getRawTempView("v")
         assert(view.plan.isDefined)
 
-        // add column to underlying table
+        // add top-level column to underlying table
         sql(s"ALTER TABLE $t ADD COLUMN age int")
 
-        // accessing temp view should detect schema change
-        checkError(
-          exception = intercept[AnalysisException] { 
spark.table("v").collect() },
-          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-          parameters = Map(
-            "viewName" -> "`v`",
-            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-            "colType" -> "data",
-            "errors" -> "- `age` INT has been added"))
+        // accessing temp view should succeed as top-level column additions 
are allowed
+        checkAnswer(spark.table("v"), Seq.empty)
+
+        // insert data to verify view still works correctly
+        sql(s"INSERT INTO $t VALUES (1, 'a', 25)")
+        checkAnswer(spark.table("v"), Seq(Row(1, "a")))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to