This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e886428cdfd3 [SPARK-54735][SQL] Properly preserve column comments in 
view with SCHEMA EVOLUTION
e886428cdfd3 is described below

commit e886428cdfd3f59232bc037e8665b70e75863d25
Author: Milan Dankovic <[email protected]>
AuthorDate: Mon Dec 22 17:56:42 2025 +0800

    [SPARK-54735][SQL] Properly preserve column comments in view with SCHEMA 
EVOLUTION
    
    ### What changes were proposed in this pull request?
    In this PR I propose a fix to avoid overwriting user defined comments in 
views that have schema evolution enabled.
    
    ### Why are the changes needed?
    When user creates a view with `SCHEMA EVOLUTION`, and then changes the 
comments on view columns, by using `DESCRIBE`, they are able to see new 
comments, but after executing a query on a view, comment values are reversed to 
a values from the underlying table (schema evolution enabled).
    
    In the proposed fix, if the column name is changed, comments from the table 
will be adopted (consistent with evolution semantics).
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Unit test in `SQLViewTestSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #53508 from miland-db/milan-dankovic_data/master.
    
    Authored-by: Milan Dankovic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   1 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |  13 ++
 .../spark/sql/execution/datasources/rules.scala    |  46 +++++-
 .../results/view-schema-evolution.sql.out          |   4 +-
 .../spark/sql/execution/SQLViewTestSuite.scala     | 159 ++++++++++++++++++++-
 5 files changed, 216 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e576cf76f4d..801b494e06de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -235,6 +235,7 @@ object AnalysisContext {
 object Analyzer {
   // List of configurations that should be passed on when resolving views and 
SQL UDF.
   private val RETAINED_ANALYSIS_FLAGS = Seq(
+    "spark.sql.view.schemaEvolution.preserveUserComments",
     // retainedHiveConfigs
     // TODO: remove these Hive-related configs after the `RelationConversions` 
is moved to
     // optimization phase.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 24b283df81c9..413f127f5fd7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2188,6 +2188,16 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS =
+    buildConf("spark.sql.view.schemaEvolution.preserveUserComments")
+      .internal()
+      .doc("When enabled, views with SCHEMA EVOLUTION mode will preserve 
user-set view comments " +
+        "when the underlying table schema evolves. When disabled, view 
comments will be " +
+        "overwritten with table comments on every schema sync.")
+      .version("4.2.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val OUTPUT_COMMITTER_CLASS = 
buildConf("spark.sql.sources.outputCommitterClass")
     .version("1.4.0")
     .internal()
@@ -7485,6 +7495,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def viewSchemaCompensation: Boolean = getConf(VIEW_SCHEMA_COMPENSATION)
 
+  def viewSchemaEvolutionPreserveUserComments: Boolean =
+    getConf(VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS)
+
   def defaultCacheStorageLevel: StorageLevel =
     StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL).name())
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 060d7fe72c0a..b8e0627dfc43 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -39,7 +39,7 @@ import 
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.ArrayImplicits._
@@ -677,6 +677,19 @@ case class QualifyLocationWithWarehouse(catalog: 
SessionCatalog) extends Rule[Lo
  * It does so by walking the resolved plan looking for View operators for 
persisted views.
  */
 object ViewSyncSchemaToMetaStore extends (LogicalPlan => Unit) {
+
+  /**
+   * Checks if comment changes between view and table should trigger schema 
sync.
+   * When preserveUserComments flag is enabled, comment differences should NOT 
trigger sync
+   * because we want to preserve user-set view comments.
+   */
+  private def shouldTriggerRedoOnCommentChange(
+      viewField: StructField,
+      tableField: StructField,
+      preserveUserComments: Boolean): Boolean = {
+    !preserveUserComments && viewField.getComment() != tableField.getComment()
+  }
+
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case View(metaData, false, viewQuery, _)
@@ -695,19 +708,44 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan => 
Unit) {
           (field.dataType != planField.dataType ||
             field.nullable != planField.nullable ||
             (viewSchemaMode == SchemaEvolution && (
-              field.getComment() != planField.getComment() ||
-              field.name != planField.name)))
+              field.name != planField.name ||
+                shouldTriggerRedoOnCommentChange(
+                  field,
+                  planField,
+                  
session.sessionState.conf.viewSchemaEvolutionPreserveUserComments))))
         }
 
+        lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
+
         if (redo) {
           val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
             val newFields = viewQuery.schema.map {
               case StructField(name, dataType, nullable, _) =>
                 StructField(name, dataType, nullable,
-                  viewFields.find(_.name == name).get.metadata)
+                  viewFieldsByName(name).metadata)
+            }
+            StructType(newFields)
+          } else if 
(session.sessionState.conf.viewSchemaEvolutionPreserveUserComments) {
+            // Adopt types/nullable/names from query, but preserve view 
comments.
+            val newFields = viewQuery.schema.map { planField =>
+              val newMetadata = viewFieldsByName.get(planField.name) match {
+                case Some(viewField) =>
+                  // Use table metadata but override with view comment
+                  val builder = new 
MetadataBuilder().withMetadata(planField.metadata)
+                  viewField.getComment() match {
+                    case Some(comment) => builder.putString("comment", comment)
+                    case None => builder.remove("comment")
+                  }
+                  builder.build()
+                case None =>
+                  // New column, use table metadata as-is
+                  planField.metadata
+              }
+              StructField(planField.name, planField.dataType, 
planField.nullable, newMetadata)
             }
             StructType(newFields)
           } else {
+            // Legacy behavior: adopt everything from table including comments.
             viewQuery.schema
           }
           
SchemaUtils.checkColumnNameDuplication(fieldNames.toImmutableArraySeq,
diff --git 
a/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out 
b/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
index 7410e7eaafd6..497e307f592f 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
@@ -897,8 +897,8 @@ DESCRIBE EXTENDED v
 -- !query schema
 struct<col_name:string,data_type:string,comment:string>
 -- !query output
-c1                     bigint                  c1 6e               
-c2                     string                  c2 6e               
+c1                     bigint                  c1 6d               
+c2                     string                  c2 6d               
                                                                    
 # Detailed Table Information                                               
 Catalog                spark_catalog                               
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index cd52c75106e1..6970ce2798a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -33,7 +33,7 @@ import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
 import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.util.ArrayImplicits._
 
 /**
@@ -865,6 +865,163 @@ class PersistedViewTestSuite extends SQLViewTestSuite 
with SharedSparkSession {
     }
   }
 
+  test("Schema evolution views should preserve manually set comments") {
+    withTable("t") {
+      withView("v") {
+        // Create table with comments.
+        sql("CREATE TABLE t (c1 INT COMMENT " +
+          "'table comment 1', c2 STRING COMMENT 'table comment 2')")
+        sql("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+        // Create view with schema evolution (no column list) - initially 
adopts table comments.
+        sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
+
+        // Verify initial comments from table are adopted.
+        val descInitial = sql("DESCRIBE EXTENDED v").collect()
+        val c1CommentInitial = descInitial.filter(r => r.getString(0) == "c1")
+        val c2CommentInitial = descInitial.filter(r => r.getString(0) == "c2")
+        assert(c1CommentInitial.nonEmpty && c1CommentInitial(0).getString(2) 
== "table comment 1",
+          "Initial c1 comment should be 'table comment 1' from table")
+        assert(c2CommentInitial.nonEmpty && c2CommentInitial(0).getString(2) 
== "table comment 2",
+          "Initial c2 comment should be 'table comment 2' from table")
+
+        // Simulate user manually changing view comments (via UI or ALTER 
COLUMN).
+        val catalog = spark.sessionState.catalog
+        val viewMeta = catalog.getTableMetadata(TableIdentifier("v"))
+        val newSchema = StructType(Seq(
+          StructField("c1", IntegerType, nullable = true).withComment("user 
comment 1"),
+          StructField("c2", StringType, nullable = true).withComment("user 
comment 2")
+        ))
+        catalog.alterTable(viewMeta.copy(schema = newSchema))
+
+        // Verify manually set comments.
+        val descManual = sql("DESCRIBE EXTENDED v").collect()
+        val c1CommentManual = descManual.filter(r => r.getString(0) == "c1")
+        val c2CommentManual = descManual.filter(r => r.getString(0) == "c2")
+        assert(c1CommentManual.nonEmpty && c1CommentManual(0).getString(2) == 
"user comment 1",
+          "c1 comment should be 'user comment 1'")
+        assert(c2CommentManual.nonEmpty && c2CommentManual(0).getString(2) == 
"user comment 2",
+          "c2 comment should be 'user comment 2'")
+
+        // SELECT from view (triggers ViewSyncSchemaToMetaStore).
+        checkAnswer(sql("SELECT * FROM v"), Seq(Row(1, "a"), Row(2, "b"), 
Row(3, "c")))
+
+        // Verify manually set comments are PRESERVED (not reverted to table 
comments).
+        val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
+        val c1CommentAfter = descAfterSelect.filter(r => r.getString(0) == 
"c1")
+        val c2CommentAfter = descAfterSelect.filter(r => r.getString(0) == 
"c2")
+        assert(c1CommentAfter.nonEmpty && c1CommentAfter(0).getString(2) == 
"user comment 1",
+          "c1 comment should still be 'user comment 1' after SELECT (bug: was 
reverted)")
+        assert(c2CommentAfter.nonEmpty && c2CommentAfter(0).getString(2) == 
"user comment 2",
+          "c2 comment should still be 'user comment 2' after SELECT (bug: was 
reverted)")
+
+        // Verify that type changes are still adopted.
+        sql("DROP TABLE t")
+        sql("CREATE TABLE t (c1 BIGINT COMMENT 'table comment changed', " +
+          "c2 DOUBLE COMMENT 'table comment changed 2')")
+        sql("INSERT INTO t VALUES (4, 5.0), (6, 7.0)")
+
+        // SELECT from view - should adopt new types but preserve view 
comments.
+        checkAnswer(sql("SELECT * FROM v"), Seq(Row(4, 5.0), Row(6, 7.0)))
+
+        // Verify types changed but comments preserved.
+        val descAfterTypeChange = sql("DESCRIBE EXTENDED v").collect()
+        val c1Final = descAfterTypeChange.filter(r => r.getString(0) == "c1")
+        val c2Final = descAfterTypeChange.filter(r => r.getString(0) == "c2")
+        assert(c1Final.nonEmpty && c1Final(0).getString(1) == "bigint",
+          "c1 type should be updated to bigint")
+        assert(c2Final.nonEmpty && c2Final(0).getString(1) == "double",
+          "c2 type should be updated to double")
+        assert(c1Final.nonEmpty && c1Final(0).getString(2) == "user comment 1",
+          "c1 comment should still be 'user comment 1' (preserved)")
+        assert(c2Final.nonEmpty && c2Final(0).getString(2) == "user comment 2",
+          "c2 comment should still be 'user comment 2' (preserved)")
+      }
+    }
+  }
+
+  test("Schema evolution comments legacy behavior with 
preserveUserComments=false") {
+    withSQLConf(VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS.key -> "false") {
+      withTable("t") {
+        withView("v") {
+          // Create table with comments.
+          sql("CREATE TABLE t (c1 INT COMMENT " +
+            "'table comment 1', c2 STRING COMMENT 'table comment 2')")
+          sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+          // Create view with schema evolution.
+          sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
+
+          // User manually changes view comments.
+          val catalog = spark.sessionState.catalog
+          val viewMeta = catalog.getTableMetadata(TableIdentifier("v"))
+          val newSchema = StructType(Seq(
+            StructField("c1", IntegerType, nullable = true).withComment("user 
comment 1"),
+            StructField("c2", StringType, nullable = true).withComment("user 
comment 2")
+          ))
+          catalog.alterTable(viewMeta.copy(schema = newSchema))
+
+          // Verify manually set comments.
+          val descManual = sql("DESCRIBE EXTENDED v").collect()
+          val c1CommentManual = descManual.filter(r => r.getString(0) == "c1")
+          val c2CommentManual = descManual.filter(r => r.getString(0) == "c2")
+          assert(c1CommentManual.nonEmpty && c1CommentManual(0).getString(2) 
== "user comment 1")
+          assert(c2CommentManual.nonEmpty && c2CommentManual(0).getString(2) 
== "user comment 2")
+
+          // SELECT from view (triggers ViewSyncSchemaToMetaStore).
+          checkAnswer(sql("SELECT * FROM v"), Seq(Row(1, "a"), Row(2, "b")))
+
+          // With flag=false, comments should REVERT to table comments (legacy 
behavior).
+          val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
+          val c1CommentAfter = descAfterSelect.filter(r => r.getString(0) == 
"c1")
+          val c2CommentAfter = descAfterSelect.filter(r => r.getString(0) == 
"c2")
+          assert(c1CommentAfter.nonEmpty && c1CommentAfter(0).getString(2) == 
"table comment 1",
+            "c1 comment should revert to 'table comment 1' (legacy behavior)")
+          assert(c2CommentAfter.nonEmpty && c2CommentAfter(0).getString(2) == 
"table comment 2",
+            "c2 comment should revert to 'table comment 2' (legacy behavior)")
+        }
+      }
+    }
+  }
+
+  test("Comments are preserved when table comment changes with 
preserveUserComments=true") {
+    withTable("t") {
+      withView("v") {
+        // Create table with initial comment.
+        sql("CREATE TABLE t (c1 INT COMMENT 'original table comment')")
+        sql("INSERT INTO t VALUES (1), (2)")
+
+        // Create view with schema evolution - inherits table comment.
+        sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
+
+        // Verify view has inherited the table comment.
+        val descInitial = sql("DESCRIBE EXTENDED v").collect()
+        val c1Initial = descInitial.filter(r => r.getString(0) == "c1")
+        assert(c1Initial.nonEmpty && c1Initial(0).getString(2) == "original 
table comment",
+          "View should inherit table comment")
+
+        // Change the table comment.
+        sql("ALTER TABLE t CHANGE COLUMN c1 c1 INT COMMENT 'new table 
comment'")
+
+        // Verify table comment changed.
+        val descTable = sql("DESCRIBE EXTENDED t").collect()
+        val c1Table = descTable.filter(r => r.getString(0) == "c1")
+        assert(c1Table.nonEmpty && c1Table(0).getString(2) == "new table 
comment",
+          "Table comment should be updated")
+
+        // SELECT from view (triggers ViewSyncSchemaToMetaStore).
+        checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
+
+        // Verify view still has the original inherited comment (frozen).
+        val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
+        val c1AfterSelect = descAfterSelect.filter(r => r.getString(0) == "c1")
+        assert(c1AfterSelect.nonEmpty &&
+          c1AfterSelect(0).getString(2) == "original table comment",
+          "View should preserve inherited comment even when table comment 
changes")
+      }
+    }
+  }
+
   def getShowCreateDDL(view: String, serde: Boolean = false): String = {
     val result = if (serde) {
       sql(s"SHOW CREATE TABLE $view AS SERDE")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to