This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b3703755d805 [SPARK-55631][SQL] ALTER TABLE must invalidate cache for 
DSv2 tables
b3703755d805 is described below

commit b3703755d80585297367d539de9fa8c5783b1c6b
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Feb 25 14:07:14 2026 -0800

    [SPARK-55631][SQL] ALTER TABLE must invalidate cache for DSv2 tables
    
    ### What changes were proposed in this pull request?
    
    This PR makes ALTER TABLE command invalidate cache.
    
    ### Why are the changes needed?
    
    These changes are needed to reflect changes made in the session correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ALTER TABLE commands will now invalidate cache.
    
    ### How was this patch tested?
    
    This PR comes with test that would previously fail.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54427 from aokolnychyi/spark-55631.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../sql/execution/datasources/v2/AlterTableExec.scala  |  5 +++--
 .../datasources/v2/DataSourceV2Strategy.scala          |  6 +++++-
 .../scala/org/apache/spark/sql/CachedTableSuite.scala  | 18 ++++++++++++++++++
 3 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
index 7b69c5d6f6bd..fe834f687e49 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.{SparkPlan, 
UnaryExecNode}
 case class AlterTableExec(
     catalog: TableCatalog,
     ident: Identifier,
-    changes: Seq[TableChange]) extends LeafV2CommandExec {
+    changes: Seq[TableChange],
+    refreshCache: () => Unit) extends LeafV2CommandExec {
 
   override def output: Seq[Attribute] = Seq.empty
 
@@ -42,7 +43,7 @@ case class AlterTableExec(
       case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] =>
         throw QueryExecutionErrors.unsupportedTableChangeError(e)
     }
-
+    refreshCache()
     Seq.empty
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 81bc1990404a..014b43c91543 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -579,7 +579,11 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val table = a.table.asInstanceOf[ResolvedTable]
       ResolveTableConstraints.validateCatalogForTableChange(
         a.changes, table.catalog, table.identifier)
-      AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
+      AlterTableExec(
+        table.catalog,
+        table.identifier,
+        a.changes,
+        recacheTable(table, includeTimeTravel = false)) :: Nil
 
     case CreateIndex(ResolvedTable(_, _, table, _),
         indexName, indexType, ifNotExists, columns, properties) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 234383239e90..a928a9131d47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -2663,6 +2663,24 @@ class CachedTableSuite extends QueryTest
     }
   }
 
+  test("ALTER TABLE invalidates cached table") {
+    val t = "testcat.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id int, data string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+      sql(s"CACHE TABLE $t")
+      assertCached(sql(s"SELECT * FROM $t"))
+      checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b")))
+
+      sql(s"ALTER TABLE $t ADD COLUMN new_col int")
+
+      val result = sql(s"SELECT * FROM $t ORDER BY id")
+      assertCached(result)
+      checkAnswer(result, Seq(Row(1, "a", null), Row(2, "b", null)))
+    }
+  }
+
   private def cacheManager = spark.sharedState.cacheManager
 
   private def pinTable(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to