This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e886428cdfd3 [SPARK-54735][SQL] Properly preserve column comments in
view with SCHEMA EVOLUTION
e886428cdfd3 is described below
commit e886428cdfd3f59232bc037e8665b70e75863d25
Author: Milan Dankovic <[email protected]>
AuthorDate: Mon Dec 22 17:56:42 2025 +0800
[SPARK-54735][SQL] Properly preserve column comments in view with SCHEMA
EVOLUTION
### What changes were proposed in this pull request?
In this PR I propose a fix to avoid overwriting user defined comments in
views that have schema evolution enabled.
### Why are the changes needed?
When user creates a view with `SCHEMA EVOLUTION`, and then changes the
comments on view columns, by using `DESCRIBE`, they are able to see new
comments, but after executing a query on a view, comment values are reversed to
a values from the underlying table (schema evolution enabled).
In the proposed fix, if the column name is changed, comments from the table
will be adopted (consistent with evolution semantics).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test in `SQLViewTestSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53508 from miland-db/milan-dankovic_data/master.
Authored-by: Milan Dankovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 1 +
.../org/apache/spark/sql/internal/SQLConf.scala | 13 ++
.../spark/sql/execution/datasources/rules.scala | 46 +++++-
.../results/view-schema-evolution.sql.out | 4 +-
.../spark/sql/execution/SQLViewTestSuite.scala | 159 ++++++++++++++++++++-
5 files changed, 216 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e576cf76f4d..801b494e06de 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -235,6 +235,7 @@ object AnalysisContext {
object Analyzer {
// List of configurations that should be passed on when resolving views and
SQL UDF.
private val RETAINED_ANALYSIS_FLAGS = Seq(
+ "spark.sql.view.schemaEvolution.preserveUserComments",
// retainedHiveConfigs
// TODO: remove these Hive-related configs after the `RelationConversions`
is moved to
// optimization phase.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 24b283df81c9..413f127f5fd7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2188,6 +2188,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS =
+ buildConf("spark.sql.view.schemaEvolution.preserveUserComments")
+ .internal()
+ .doc("When enabled, views with SCHEMA EVOLUTION mode will preserve
user-set view comments " +
+ "when the underlying table schema evolves. When disabled, view
comments will be " +
+ "overwritten with table comments on every schema sync.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(true)
+
val OUTPUT_COMMITTER_CLASS =
buildConf("spark.sql.sources.outputCommitterClass")
.version("1.4.0")
.internal()
@@ -7485,6 +7495,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def viewSchemaCompensation: Boolean = getConf(VIEW_SCHEMA_COMPENSATION)
+ def viewSchemaEvolutionPreserveUserComments: Boolean =
+ getConf(VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS)
+
def defaultCacheStorageLevel: StorageLevel =
StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL).name())
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 060d7fe72c0a..b8e0627dfc43 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -39,7 +39,7 @@ import
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.ArrayImplicits._
@@ -677,6 +677,19 @@ case class QualifyLocationWithWarehouse(catalog:
SessionCatalog) extends Rule[Lo
* It does so by walking the resolved plan looking for View operators for
persisted views.
*/
object ViewSyncSchemaToMetaStore extends (LogicalPlan => Unit) {
+
+ /**
+ * Checks if comment changes between view and table should trigger schema
sync.
+ * When preserveUserComments flag is enabled, comment differences should NOT
trigger sync
+ * because we want to preserve user-set view comments.
+ */
+ private def shouldTriggerRedoOnCommentChange(
+ viewField: StructField,
+ tableField: StructField,
+ preserveUserComments: Boolean): Boolean = {
+ !preserveUserComments && viewField.getComment() != tableField.getComment()
+ }
+
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case View(metaData, false, viewQuery, _)
@@ -695,19 +708,44 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.getComment() != planField.getComment() ||
- field.name != planField.name)))
+ field.name != planField.name ||
+ shouldTriggerRedoOnCommentChange(
+ field,
+ planField,
+
session.sessionState.conf.viewSchemaEvolutionPreserveUserComments))))
}
+ lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
+
if (redo) {
val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
val newFields = viewQuery.schema.map {
case StructField(name, dataType, nullable, _) =>
StructField(name, dataType, nullable,
- viewFields.find(_.name == name).get.metadata)
+ viewFieldsByName(name).metadata)
+ }
+ StructType(newFields)
+ } else if
(session.sessionState.conf.viewSchemaEvolutionPreserveUserComments) {
+ // Adopt types/nullable/names from query, but preserve view
comments.
+ val newFields = viewQuery.schema.map { planField =>
+ val newMetadata = viewFieldsByName.get(planField.name) match {
+ case Some(viewField) =>
+ // Use table metadata but override with view comment
+ val builder = new
MetadataBuilder().withMetadata(planField.metadata)
+ viewField.getComment() match {
+ case Some(comment) => builder.putString("comment", comment)
+ case None => builder.remove("comment")
+ }
+ builder.build()
+ case None =>
+ // New column, use table metadata as-is
+ planField.metadata
+ }
+ StructField(planField.name, planField.dataType,
planField.nullable, newMetadata)
}
StructType(newFields)
} else {
+ // Legacy behavior: adopt everything from table including comments.
viewQuery.schema
}
SchemaUtils.checkColumnNameDuplication(fieldNames.toImmutableArraySeq,
diff --git
a/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
b/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
index 7410e7eaafd6..497e307f592f 100644
---
a/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
+++
b/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
@@ -897,8 +897,8 @@ DESCRIBE EXTENDED v
-- !query schema
struct<col_name:string,data_type:string,comment:string>
-- !query output
-c1 bigint c1 6e
-c2 string c2 6e
+c1 bigint c1 6d
+c2 string c2 6d
# Detailed Table Information
Catalog spark_catalog
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index cd52c75106e1..6970ce2798a2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
import org.apache.spark.util.ArrayImplicits._
/**
@@ -865,6 +865,163 @@ class PersistedViewTestSuite extends SQLViewTestSuite
with SharedSparkSession {
}
}
+ test("Schema evolution views should preserve manually set comments") {
+ withTable("t") {
+ withView("v") {
+ // Create table with comments.
+ sql("CREATE TABLE t (c1 INT COMMENT " +
+ "'table comment 1', c2 STRING COMMENT 'table comment 2')")
+ sql("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ // Create view with schema evolution (no column list) - initially
adopts table comments.
+ sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
+
+ // Verify initial comments from table are adopted.
+ val descInitial = sql("DESCRIBE EXTENDED v").collect()
+ val c1CommentInitial = descInitial.filter(r => r.getString(0) == "c1")
+ val c2CommentInitial = descInitial.filter(r => r.getString(0) == "c2")
+ assert(c1CommentInitial.nonEmpty && c1CommentInitial(0).getString(2)
== "table comment 1",
+ "Initial c1 comment should be 'table comment 1' from table")
+ assert(c2CommentInitial.nonEmpty && c2CommentInitial(0).getString(2)
== "table comment 2",
+ "Initial c2 comment should be 'table comment 2' from table")
+
+ // Simulate user manually changing view comments (via UI or ALTER
COLUMN).
+ val catalog = spark.sessionState.catalog
+ val viewMeta = catalog.getTableMetadata(TableIdentifier("v"))
+ val newSchema = StructType(Seq(
+ StructField("c1", IntegerType, nullable = true).withComment("user
comment 1"),
+ StructField("c2", StringType, nullable = true).withComment("user
comment 2")
+ ))
+ catalog.alterTable(viewMeta.copy(schema = newSchema))
+
+ // Verify manually set comments.
+ val descManual = sql("DESCRIBE EXTENDED v").collect()
+ val c1CommentManual = descManual.filter(r => r.getString(0) == "c1")
+ val c2CommentManual = descManual.filter(r => r.getString(0) == "c2")
+ assert(c1CommentManual.nonEmpty && c1CommentManual(0).getString(2) ==
"user comment 1",
+ "c1 comment should be 'user comment 1'")
+ assert(c2CommentManual.nonEmpty && c2CommentManual(0).getString(2) ==
"user comment 2",
+ "c2 comment should be 'user comment 2'")
+
+ // SELECT from view (triggers ViewSyncSchemaToMetaStore).
+ checkAnswer(sql("SELECT * FROM v"), Seq(Row(1, "a"), Row(2, "b"),
Row(3, "c")))
+
+ // Verify manually set comments are PRESERVED (not reverted to table
comments).
+ val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
+ val c1CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c1")
+ val c2CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c2")
+ assert(c1CommentAfter.nonEmpty && c1CommentAfter(0).getString(2) ==
"user comment 1",
+ "c1 comment should still be 'user comment 1' after SELECT (bug: was
reverted)")
+ assert(c2CommentAfter.nonEmpty && c2CommentAfter(0).getString(2) ==
"user comment 2",
+ "c2 comment should still be 'user comment 2' after SELECT (bug: was
reverted)")
+
+ // Verify that type changes are still adopted.
+ sql("DROP TABLE t")
+ sql("CREATE TABLE t (c1 BIGINT COMMENT 'table comment changed', " +
+ "c2 DOUBLE COMMENT 'table comment changed 2')")
+ sql("INSERT INTO t VALUES (4, 5.0), (6, 7.0)")
+
+ // SELECT from view - should adopt new types but preserve view
comments.
+ checkAnswer(sql("SELECT * FROM v"), Seq(Row(4, 5.0), Row(6, 7.0)))
+
+ // Verify types changed but comments preserved.
+ val descAfterTypeChange = sql("DESCRIBE EXTENDED v").collect()
+ val c1Final = descAfterTypeChange.filter(r => r.getString(0) == "c1")
+ val c2Final = descAfterTypeChange.filter(r => r.getString(0) == "c2")
+ assert(c1Final.nonEmpty && c1Final(0).getString(1) == "bigint",
+ "c1 type should be updated to bigint")
+ assert(c2Final.nonEmpty && c2Final(0).getString(1) == "double",
+ "c2 type should be updated to double")
+ assert(c1Final.nonEmpty && c1Final(0).getString(2) == "user comment 1",
+ "c1 comment should still be 'user comment 1' (preserved)")
+ assert(c2Final.nonEmpty && c2Final(0).getString(2) == "user comment 2",
+ "c2 comment should still be 'user comment 2' (preserved)")
+ }
+ }
+ }
+
+ test("Schema evolution comments legacy behavior with
preserveUserComments=false") {
+ withSQLConf(VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS.key -> "false") {
+ withTable("t") {
+ withView("v") {
+ // Create table with comments.
+ sql("CREATE TABLE t (c1 INT COMMENT " +
+ "'table comment 1', c2 STRING COMMENT 'table comment 2')")
+ sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+ // Create view with schema evolution.
+ sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
+
+ // User manually changes view comments.
+ val catalog = spark.sessionState.catalog
+ val viewMeta = catalog.getTableMetadata(TableIdentifier("v"))
+ val newSchema = StructType(Seq(
+ StructField("c1", IntegerType, nullable = true).withComment("user
comment 1"),
+ StructField("c2", StringType, nullable = true).withComment("user
comment 2")
+ ))
+ catalog.alterTable(viewMeta.copy(schema = newSchema))
+
+ // Verify manually set comments.
+ val descManual = sql("DESCRIBE EXTENDED v").collect()
+ val c1CommentManual = descManual.filter(r => r.getString(0) == "c1")
+ val c2CommentManual = descManual.filter(r => r.getString(0) == "c2")
+ assert(c1CommentManual.nonEmpty && c1CommentManual(0).getString(2)
== "user comment 1")
+ assert(c2CommentManual.nonEmpty && c2CommentManual(0).getString(2)
== "user comment 2")
+
+ // SELECT from view (triggers ViewSyncSchemaToMetaStore).
+ checkAnswer(sql("SELECT * FROM v"), Seq(Row(1, "a"), Row(2, "b")))
+
+ // With flag=false, comments should REVERT to table comments (legacy
behavior).
+ val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
+ val c1CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c1")
+ val c2CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c2")
+ assert(c1CommentAfter.nonEmpty && c1CommentAfter(0).getString(2) ==
"table comment 1",
+ "c1 comment should revert to 'table comment 1' (legacy behavior)")
+ assert(c2CommentAfter.nonEmpty && c2CommentAfter(0).getString(2) ==
"table comment 2",
+ "c2 comment should revert to 'table comment 2' (legacy behavior)")
+ }
+ }
+ }
+ }
+
+ test("Comments are preserved when table comment changes with
preserveUserComments=true") {
+ withTable("t") {
+ withView("v") {
+ // Create table with initial comment.
+ sql("CREATE TABLE t (c1 INT COMMENT 'original table comment')")
+ sql("INSERT INTO t VALUES (1), (2)")
+
+ // Create view with schema evolution - inherits table comment.
+ sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
+
+ // Verify view has inherited the table comment.
+ val descInitial = sql("DESCRIBE EXTENDED v").collect()
+ val c1Initial = descInitial.filter(r => r.getString(0) == "c1")
+ assert(c1Initial.nonEmpty && c1Initial(0).getString(2) == "original
table comment",
+ "View should inherit table comment")
+
+ // Change the table comment.
+ sql("ALTER TABLE t CHANGE COLUMN c1 c1 INT COMMENT 'new table
comment'")
+
+ // Verify table comment changed.
+ val descTable = sql("DESCRIBE EXTENDED t").collect()
+ val c1Table = descTable.filter(r => r.getString(0) == "c1")
+ assert(c1Table.nonEmpty && c1Table(0).getString(2) == "new table
comment",
+ "Table comment should be updated")
+
+ // SELECT from view (triggers ViewSyncSchemaToMetaStore).
+ checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
+
+ // Verify view still has the original inherited comment (frozen).
+ val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
+ val c1AfterSelect = descAfterSelect.filter(r => r.getString(0) == "c1")
+ assert(c1AfterSelect.nonEmpty &&
+ c1AfterSelect(0).getString(2) == "original table comment",
+ "View should preserve inherited comment even when table comment
changes")
+ }
+ }
+ }
+
def getShowCreateDDL(view: String, serde: Boolean = false): String = {
val result = if (serde) {
sql(s"SHOW CREATE TABLE $view AS SERDE")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]