This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new fe71d9533752 [SPARK-54424][SQL] Failures during recaching must not 
fail operations
fe71d9533752 is described below

commit fe71d953375225e7f4c3a3fc74cd8cf468936e8a
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sun Nov 30 10:49:40 2025 -0800

    [SPARK-54424][SQL] Failures during recaching must not fail operations
    
    ### What changes were proposed in this pull request?
    
    This PR prevents failures during recaching failing write/refresh operations.
    
    ### Why are the changes needed?
    
    After recent changes in SPARK-54387, we may now mark write operations as 
failed even though they successfully committed to the table but the cache 
refresh was unsuccessful.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, `recacheByXXX` will no longer throw an exception if recaching fails.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53143 from aokolnychyi/spark-54424.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 7833e2fccf9faf24175f60db6e6187edd2e1be9f)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/execution/CacheManager.scala  |  73 +++++++++--
 .../spark/sql/execution/QueryExecution.scala       |  23 +++-
 .../datasources/v2/V2TableRefreshUtil.scala        |  22 ----
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  14 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 146 +++++++++++++++++++++
 5 files changed, 233 insertions(+), 45 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 5a38751b61e1..34e47084f656 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.{Logging, MessageWithContext}
@@ -374,25 +376,68 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     }
     needToRecache.foreach { cd =>
       cd.cachedRepresentation.cacheBuilder.clearCache()
-      val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
-      val (newKey, newCache) = sessionWithConfigsOff.withActive {
-        val refreshedPlan = V2TableRefreshUtil.refresh(sessionWithConfigsOff, 
cd.plan)
-        val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
-        qe.normalized -> 
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
-      }
-      val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = 
newCache)
-      this.synchronized {
-        if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) {
-          logWarning("While recaching, data was already added to cache.")
-        } else {
-          cachedData = recomputedPlan +: cachedData
-          CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" 
+
-            log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}")
+      tryRebuildCacheEntry(spark, cd).foreach { entry =>
+        this.synchronized {
+          if (lookupCachedDataInternal(entry.plan).nonEmpty) {
+            logWarning("While recaching, data was already added to cache.")
+          } else {
+            cachedData = entry +: cachedData
+            CacheManager.logCacheOperation(log"Re-cached Dataframe cache 
entry:" +
+              log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}")
+          }
         }
       }
     }
   }
 
+  private def tryRebuildCacheEntry(spark: SparkSession, cd: CachedData): 
Option[CachedData] = {
+    val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
+    sessionWithConfigsOff.withActive {
+      tryRefreshPlan(sessionWithConfigsOff, cd.plan).map { refreshedPlan =>
+        val qe = QueryExecution.create(
+          sessionWithConfigsOff,
+          refreshedPlan,
+          refreshPhaseEnabled = false)
+        val newKey = qe.normalized
+        val newCache = InMemoryRelation(cd.cachedRepresentation.cacheBuilder, 
qe)
+        cd.copy(plan = newKey, cachedRepresentation = newCache)
+      }
+    }
+  }
+
+  /**
+   * Attempts to refresh table metadata loaded through the catalog.
+   *
+   * If the table state is cached (e.g., via `CACHE TABLE t`), the relation is 
replaced with
+   * updated metadata as long as the table ID still matches, ensuring that all 
schema changes
+   * are reflected. Otherwise, a new plan is produced using refreshed table 
metadata but
+   * retaining the original schema, provided the schema changes are still 
compatible with the
+   * query (e.g., adding new columns should be acceptable).
+   *
+   * Note this logic applies only to V2 tables at the moment.
+   *
+   * @return the refreshed plan if refresh succeeds, None otherwise
+   */
+  private def tryRefreshPlan(spark: SparkSession, plan: LogicalPlan): 
Option[LogicalPlan] = {
+    try {
+      EliminateSubqueryAliases(plan) match {
+        case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if 
r.timeTravelSpec.isEmpty =>
+          val table = catalog.loadTable(ident)
+          if (r.table.id == table.id) {
+            Some(DataSourceV2Relation.create(table, Some(catalog), 
Some(ident)))
+          } else {
+            None
+          }
+        case _ =>
+          Some(V2TableRefreshUtil.refresh(spark, plan))
+      }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(log"Failed to refresh plan while attempting to recache", e)
+        None
+    }
+  }
+
   private[sql] def lookupCachedTable(
       name: Seq[String],
       resolver: Resolver): Option[LogicalPlan] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 26d2078791aa..3e0aef962e71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -66,7 +66,8 @@ class QueryExecution(
     val logical: LogicalPlan,
     val tracker: QueryPlanningTracker = new QueryPlanningTracker,
     val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL,
-    val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup) extends Logging 
{
+    val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup,
+    val refreshPhaseEnabled: Boolean = true) extends Logging {
 
   val id: Long = QueryExecution.nextExecutionId
 
@@ -178,7 +179,7 @@ class QueryExecution(
       // for eagerly executed commands we mark this place as beginning of 
execution.
       tracker.setReadyForExecution()
       val qe = new QueryExecution(sparkSession, p, mode = mode,
-        shuffleCleanupMode = shuffleCleanupMode)
+        shuffleCleanupMode = shuffleCleanupMode, refreshPhaseEnabled = 
refreshPhaseEnabled)
       val result = QueryExecution.withInternalError(s"Eagerly executed $name 
failed.") {
         SQLExecution.withNewExecutionId(qe, Some(name)) {
           qe.executedPlan.executeCollect()
@@ -207,7 +208,11 @@ class QueryExecution(
   // there may be delay between analysis and subsequent phases
   // therefore, refresh captured table versions to reflect latest data
   private val lazyTableVersionsRefreshed = LazyTry {
-    V2TableRefreshUtil.refresh(sparkSession, commandExecuted, versionedOnly = 
true)
+    if (refreshPhaseEnabled) {
+      V2TableRefreshUtil.refresh(sparkSession, commandExecuted, versionedOnly 
= true)
+    } else {
+      commandExecuted
+    }
   }
 
   private[sql] def tableVersionsRefreshed: LogicalPlan = 
lazyTableVersionsRefreshed.get
@@ -569,6 +574,18 @@ object QueryExecution {
 
   private def nextExecutionId: Long = _nextExecutionId.getAndIncrement
 
+  private[execution] def create(
+      sparkSession: SparkSession,
+      logical: LogicalPlan,
+      refreshPhaseEnabled: Boolean = true): QueryExecution = {
+    new QueryExecution(
+      sparkSession,
+      logical,
+      mode = CommandExecutionMode.ALL,
+      shuffleCleanupMode = 
determineShuffleCleanupMode(sparkSession.sessionState.conf),
+      refreshPhaseEnabled = refreshPhaseEnabled)
+  }
+
   /**
    * Construct a sequence of rules that are used to prepare a planned 
[[SparkPlan]] for execution.
    * These rules will make sure subqueries are planned, make sure the data 
partitioning and ordering
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
index 945ab122d54e..151329de9e6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.analysis.AsOfVersion
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, 
TableCatalog, V2TableUtil}
@@ -32,27 +31,6 @@ import 
org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_FIELDS
 import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
 
 private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
-  /**
-   * Pins table versions for all versioned tables in the plan.
-   *
-   * This method captures the current version of each versioned table by 
adding time travel
-   * specifications. Tables that already have time travel specifications or 
are not versioned
-   * are left unchanged.
-   *
-   * @param plan the logical plan to pin versions for
-   * @return plan with pinned table versions
-   */
-  def pinVersions(plan: LogicalPlan): LogicalPlan = {
-    plan transform {
-      case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
-          if r.isVersioned && r.timeTravelSpec.isEmpty =>
-        val tableName = V2TableUtil.toQualifiedName(catalog, ident)
-        val version = r.table.version
-        logDebug(s"Pinning table version for $tableName to $version")
-        r.copy(timeTravelSpec = Some(AsOfVersion(version)))
-    }
-  }
-
   /**
    * Refreshes table metadata for tables in the plan.
    *
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 6c574be91ebf..75915d97ba4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, 
PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, 
WriteSummary}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SQLExecution, UnaryExecNode}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.joins.BaseJoinExec
 import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, 
SQLMetrics}
@@ -177,7 +177,6 @@ case class ReplaceTableAsSelectExec(
       query,
       versionedOnly = true,
       schemaValidationMode = PROHIBIT_CHANGES)
-    val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
     if (catalog.tableExists(ident)) {
       invalidateCache(catalog, ident)
       catalog.dropTable(ident)
@@ -185,13 +184,15 @@ case class ReplaceTableAsSelectExec(
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
     val tableInfo = new TableInfo.Builder()
-      .withColumns(getV2Columns(pinnedQuery.schema, 
catalog.useNullableQuerySchema))
+      .withColumns(getV2Columns(refreshedQuery.schema, 
catalog.useNullableQuerySchema))
       .withPartitions(partitioning.toArray)
       .withProperties(properties.asJava)
       .build()
     val table = Option(catalog.createTable(ident, tableInfo))
       .getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, pinnedQuery, overwrite = 
true)
+    writeToTable(
+      catalog, table, writeOptions, ident, refreshedQuery,
+      overwrite = true, refreshPhaseEnabled = false)
   }
 }
 
@@ -764,7 +765,8 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends 
LeafV2CommandExec {
       writeOptions: Map[String, String],
       ident: Identifier,
       query: LogicalPlan,
-      overwrite: Boolean): Seq[InternalRow] = {
+      overwrite: Boolean,
+      refreshPhaseEnabled: Boolean = true): Seq[InternalRow] = {
     Utils.tryWithSafeFinallyAndFailureCallbacks({
       val relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
       val writeCommand = if (overwrite) {
@@ -772,7 +774,7 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends 
LeafV2CommandExec {
       } else {
         AppendData.byPosition(relation, query, writeOptions)
       }
-      val qe = session.sessionState.executePlan(writeCommand)
+      val qe = QueryExecution.create(session, writeCommand, 
refreshPhaseEnabled)
       qe.assertCommandExecuted()
       DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics)
       Nil
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 5f4aee277831..cdb498c8f2bf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -30,6 +30,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel
 import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
DefaultValue, Identifier, InMemoryTableCatalog, 
SupportsV1OverwriteWithSaveAsTable, TableInfo}
 import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
+import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableWritePrivilege
 import org.apache.spark.sql.connector.catalog.TruncatableTable
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
GeneralScalarExpression, LiteralValue, Transform}
@@ -1894,6 +1895,151 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  test("SPARK-54424: refresh table cache on schema changes (column removed)") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+      // cache table
+      spark.table(t).cache()
+
+      // verify caching works as expected
+      assertCached(spark.table(t))
+      checkAnswer(
+        spark.table(t),
+        Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))
+
+      // evolve table directly to mimic external changes
+      // these external changes make cached plan invalid (column is no longer 
there)
+      val change = TableChange.deleteColumn(Array("category"), false)
+      catalog("testcat").alterTable(ident, change)
+
+      // refresh table is supposed to trigger recaching
+      spark.sql(s"REFRESH TABLE $t")
+
+      // recaching is expected to succeed
+      assert(spark.sharedState.cacheManager.numCachedEntries == 1)
+
+      // verify cache reflects latest schema and data
+      assertCached(spark.table(t))
+      checkAnswer(spark.table(t), Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+    }
+  }
+
+  test("SPARK-54424: refresh table cache on schema changes (column added)") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, value INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 10), (2, 20), (3, 30)")
+
+      // cache table
+      spark.table(t).cache()
+
+      // verify caching works as expected
+      assertCached(spark.table(t))
+      checkAnswer(
+        spark.table(t),
+        Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+      // evolve table directly to mimic external changes
+      // these external changes make cached plan invalid (table state has 
changed)
+      val change = TableChange.addColumn(Array("category"), StringType, true)
+      catalog("testcat").alterTable(ident, change)
+
+      // refresh table is supposed to trigger recaching
+      spark.sql(s"REFRESH TABLE $t")
+
+      // recaching is expected to succeed
+      assert(spark.sharedState.cacheManager.numCachedEntries == 1)
+
+      // verify cache reflects latest schema and data
+      assertCached(spark.table(t))
+      checkAnswer(spark.table(t), Seq(Row(1, 10, null), Row(2, 20, null), 
Row(3, 30, null)))
+    }
+  }
+
+  test("SPARK-54424: successfully refresh cache with compatible schema 
changes") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, value INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 10), (2, 20), (3, 30)")
+
+      // cache query
+      val df = spark.table(t).filter("id < 100")
+      df.cache()
+
+      // verify caching works as expected
+      assertCached(spark.table(t).filter("id < 100"))
+      checkAnswer(
+        spark.table(t).filter("id < 100"),
+        Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+      // evolve table directly to mimic external changes
+      // adding columns should be OK
+      val change = TableChange.addColumn(Array("category"), StringType, true)
+      catalog("testcat").alterTable(ident, change)
+
+      // refresh table is supposed to trigger recaching
+      spark.sql(s"REFRESH TABLE $t")
+
+      // recaching is expected to succeed
+      assert(spark.sharedState.cacheManager.numCachedEntries == 1)
+
+      // verify derived queries still benefit from refreshed cache
+      assertCached(df.filter("id > 0"))
+      checkAnswer(df.filter("id > 0"), Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+      // add more data
+      sql(s"INSERT INTO $t VALUES (4, 40, '40')")
+
+      // verify derived queries still benefit from refreshed cache
+      assertCached(df.filter("id > 0"))
+      checkAnswer(df.filter("id > 0"), Seq(Row(1, 10), Row(2, 20), Row(3, 30), 
Row(4, 40)))
+
+      // verify latest schema is propagated (new column has NULL values for 
existing rows)
+      checkAnswer(
+        spark.table(t),
+        Seq(Row(1, 10, null), Row(2, 20, null), Row(3, 30, null), Row(4, 40, 
"40")))
+    }
+  }
+
+  test("SPARK-54424: inability to refresh cache shouldn't fail operations") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, value INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 10), (2, 20), (3, 30)")
+
+      // cache query
+      val df = spark.table(t).filter("id < 100")
+      df.cache()
+
+      // verify caching works as expected
+      assertCached(spark.table(t).filter("id < 100"))
+      checkAnswer(
+        spark.table(t).filter("id < 100"),
+        Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+      // evolve table directly to mimic external changes
+      // removing columns should be make cached plan invalid
+      val change = TableChange.deleteColumn(Array("value"), false)
+      catalog("testcat").alterTable(ident, change)
+
+      // refresh table is supposed to trigger recaching
+      spark.sql(s"REFRESH TABLE $t")
+
+      // recaching is expected to fail
+      assert(spark.sharedState.cacheManager.isEmpty)
+
+      // verify latest schema is propagated
+      checkAnswer(spark.table(t), Seq(Row(1), Row(2), Row(3)))
+    }
+  }
+
   private def pinTable(catalogName: String, ident: Identifier, version: 
String): Unit = {
     catalog(catalogName) match {
       case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, 
version)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to