This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new fe71d9533752 [SPARK-54424][SQL] Failures during recaching must not
fail operations
fe71d9533752 is described below
commit fe71d953375225e7f4c3a3fc74cd8cf468936e8a
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sun Nov 30 10:49:40 2025 -0800
[SPARK-54424][SQL] Failures during recaching must not fail operations
### What changes were proposed in this pull request?
This PR prevents failures during recaching failing write/refresh operations.
### Why are the changes needed?
After recent changes in SPARK-54387, we may now mark write operations as
failed even though they successfully committed to the table but the cache
refresh was unsuccessful.
### Does this PR introduce _any_ user-facing change?
Yes, `recacheByXXX` will no longer throw an exception if recaching fails.
### How was this patch tested?
This PR comes with tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53143 from aokolnychyi/spark-54424.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 7833e2fccf9faf24175f60db6e6187edd2e1be9f)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/execution/CacheManager.scala | 73 +++++++++--
.../spark/sql/execution/QueryExecution.scala | 23 +++-
.../datasources/v2/V2TableRefreshUtil.scala | 22 ----
.../datasources/v2/WriteToDataSourceV2Exec.scala | 14 +-
.../sql/connector/DataSourceV2DataFrameSuite.scala | 146 +++++++++++++++++++++
5 files changed, 233 insertions(+), 45 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 5a38751b61e1..34e47084f656 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import scala.util.control.NonFatal
+
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.{Logging, MessageWithContext}
@@ -374,25 +376,68 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
}
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
- val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
- val (newKey, newCache) = sessionWithConfigsOff.withActive {
- val refreshedPlan = V2TableRefreshUtil.refresh(sessionWithConfigsOff,
cd.plan)
- val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
- qe.normalized ->
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
- }
- val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation =
newCache)
- this.synchronized {
- if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) {
- logWarning("While recaching, data was already added to cache.")
- } else {
- cachedData = recomputedPlan +: cachedData
- CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:"
+
- log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}")
+ tryRebuildCacheEntry(spark, cd).foreach { entry =>
+ this.synchronized {
+ if (lookupCachedDataInternal(entry.plan).nonEmpty) {
+ logWarning("While recaching, data was already added to cache.")
+ } else {
+ cachedData = entry +: cachedData
+ CacheManager.logCacheOperation(log"Re-cached Dataframe cache
entry:" +
+ log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}")
+ }
}
}
}
}
+ private def tryRebuildCacheEntry(spark: SparkSession, cd: CachedData):
Option[CachedData] = {
+ val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
+ sessionWithConfigsOff.withActive {
+ tryRefreshPlan(sessionWithConfigsOff, cd.plan).map { refreshedPlan =>
+ val qe = QueryExecution.create(
+ sessionWithConfigsOff,
+ refreshedPlan,
+ refreshPhaseEnabled = false)
+ val newKey = qe.normalized
+ val newCache = InMemoryRelation(cd.cachedRepresentation.cacheBuilder,
qe)
+ cd.copy(plan = newKey, cachedRepresentation = newCache)
+ }
+ }
+ }
+
+ /**
+ * Attempts to refresh table metadata loaded through the catalog.
+ *
+ * If the table state is cached (e.g., via `CACHE TABLE t`), the relation is
replaced with
+ * updated metadata as long as the table ID still matches, ensuring that all
schema changes
+ * are reflected. Otherwise, a new plan is produced using refreshed table
metadata but
+ * retaining the original schema, provided the schema changes are still
compatible with the
+ * query (e.g., adding new columns should be acceptable).
+ *
+ * Note this logic applies only to V2 tables at the moment.
+ *
+ * @return the refreshed plan if refresh succeeds, None otherwise
+ */
+ private def tryRefreshPlan(spark: SparkSession, plan: LogicalPlan):
Option[LogicalPlan] = {
+ try {
+ EliminateSubqueryAliases(plan) match {
+ case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if
r.timeTravelSpec.isEmpty =>
+ val table = catalog.loadTable(ident)
+ if (r.table.id == table.id) {
+ Some(DataSourceV2Relation.create(table, Some(catalog),
Some(ident)))
+ } else {
+ None
+ }
+ case _ =>
+ Some(V2TableRefreshUtil.refresh(spark, plan))
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(log"Failed to refresh plan while attempting to recache", e)
+ None
+ }
+ }
+
private[sql] def lookupCachedTable(
name: Seq[String],
resolver: Resolver): Option[LogicalPlan] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 26d2078791aa..3e0aef962e71 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -66,7 +66,8 @@ class QueryExecution(
val logical: LogicalPlan,
val tracker: QueryPlanningTracker = new QueryPlanningTracker,
val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL,
- val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup) extends Logging
{
+ val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup,
+ val refreshPhaseEnabled: Boolean = true) extends Logging {
val id: Long = QueryExecution.nextExecutionId
@@ -178,7 +179,7 @@ class QueryExecution(
// for eagerly executed commands we mark this place as beginning of
execution.
tracker.setReadyForExecution()
val qe = new QueryExecution(sparkSession, p, mode = mode,
- shuffleCleanupMode = shuffleCleanupMode)
+ shuffleCleanupMode = shuffleCleanupMode, refreshPhaseEnabled =
refreshPhaseEnabled)
val result = QueryExecution.withInternalError(s"Eagerly executed $name
failed.") {
SQLExecution.withNewExecutionId(qe, Some(name)) {
qe.executedPlan.executeCollect()
@@ -207,7 +208,11 @@ class QueryExecution(
// there may be delay between analysis and subsequent phases
// therefore, refresh captured table versions to reflect latest data
private val lazyTableVersionsRefreshed = LazyTry {
- V2TableRefreshUtil.refresh(sparkSession, commandExecuted, versionedOnly =
true)
+ if (refreshPhaseEnabled) {
+ V2TableRefreshUtil.refresh(sparkSession, commandExecuted, versionedOnly
= true)
+ } else {
+ commandExecuted
+ }
}
private[sql] def tableVersionsRefreshed: LogicalPlan =
lazyTableVersionsRefreshed.get
@@ -569,6 +574,18 @@ object QueryExecution {
private def nextExecutionId: Long = _nextExecutionId.getAndIncrement
+ private[execution] def create(
+ sparkSession: SparkSession,
+ logical: LogicalPlan,
+ refreshPhaseEnabled: Boolean = true): QueryExecution = {
+ new QueryExecution(
+ sparkSession,
+ logical,
+ mode = CommandExecutionMode.ALL,
+ shuffleCleanupMode =
determineShuffleCleanupMode(sparkSession.sessionState.conf),
+ refreshPhaseEnabled = refreshPhaseEnabled)
+ }
+
/**
* Construct a sequence of rules that are used to prepare a planned
[[SparkPlan]] for execution.
* These rules will make sure subqueries are planned, make sure the data
partitioning and ordering
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
index 945ab122d54e..151329de9e6f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.analysis.AsOfVersion
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.connector.catalog.{Identifier, Table,
TableCatalog, V2TableUtil}
@@ -32,27 +31,6 @@ import
org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_FIELDS
import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
- /**
- * Pins table versions for all versioned tables in the plan.
- *
- * This method captures the current version of each versioned table by
adding time travel
- * specifications. Tables that already have time travel specifications or
are not versioned
- * are left unchanged.
- *
- * @param plan the logical plan to pin versions for
- * @return plan with pinned table versions
- */
- def pinVersions(plan: LogicalPlan): LogicalPlan = {
- plan transform {
- case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
- if r.isVersioned && r.timeTravelSpec.isEmpty =>
- val tableName = V2TableUtil.toQualifiedName(catalog, ident)
- val version = r.table.version
- logDebug(s"Pinning table version for $tableName to $version")
- r.copy(timeTravelSpec = Some(AsOfVersion(version)))
- }
- }
-
/**
* Refreshes table metadata for tables in the plan.
*
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 6c574be91ebf..75915d97ba4b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl,
PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage,
WriteSummary}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan,
SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.joins.BaseJoinExec
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric,
SQLMetrics}
@@ -177,7 +177,6 @@ case class ReplaceTableAsSelectExec(
query,
versionedOnly = true,
schemaValidationMode = PROHIBIT_CHANGES)
- val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
if (catalog.tableExists(ident)) {
invalidateCache(catalog, ident)
catalog.dropTable(ident)
@@ -185,13 +184,15 @@ case class ReplaceTableAsSelectExec(
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
val tableInfo = new TableInfo.Builder()
- .withColumns(getV2Columns(pinnedQuery.schema,
catalog.useNullableQuerySchema))
+ .withColumns(getV2Columns(refreshedQuery.schema,
catalog.useNullableQuerySchema))
.withPartitions(partitioning.toArray)
.withProperties(properties.asJava)
.build()
val table = Option(catalog.createTable(ident, tableInfo))
.getOrElse(catalog.loadTable(ident,
Set(TableWritePrivilege.INSERT).asJava))
- writeToTable(catalog, table, writeOptions, ident, pinnedQuery, overwrite =
true)
+ writeToTable(
+ catalog, table, writeOptions, ident, refreshedQuery,
+ overwrite = true, refreshPhaseEnabled = false)
}
}
@@ -764,7 +765,8 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends
LeafV2CommandExec {
writeOptions: Map[String, String],
ident: Identifier,
query: LogicalPlan,
- overwrite: Boolean): Seq[InternalRow] = {
+ overwrite: Boolean,
+ refreshPhaseEnabled: Boolean = true): Seq[InternalRow] = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
val relation = DataSourceV2Relation.create(table, Some(catalog),
Some(ident))
val writeCommand = if (overwrite) {
@@ -772,7 +774,7 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends
LeafV2CommandExec {
} else {
AppendData.byPosition(relation, query, writeOptions)
}
- val qe = session.sessionState.executePlan(writeCommand)
+ val qe = QueryExecution.create(session, writeCommand,
refreshPhaseEnabled)
qe.assertCommandExecuted()
DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics)
Nil
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 5f4aee277831..cdb498c8f2bf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -30,6 +30,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
DefaultValue, Identifier, InMemoryTableCatalog,
SupportsV1OverwriteWithSaveAsTable, TableInfo}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
UpdateColumnDefaultValue}
+import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableWritePrivilege
import org.apache.spark.sql.connector.catalog.TruncatableTable
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
GeneralScalarExpression, LiteralValue, Transform}
@@ -1894,6 +1895,151 @@ class DataSourceV2DataFrameSuite
}
}
+ test("SPARK-54424: refresh table cache on schema changes (column removed)") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache table
+ spark.table(t).cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t))
+ checkAnswer(
+ spark.table(t),
+ Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))
+
+ // evolve table directly to mimic external changes
+ // these external changes make cached plan invalid (column is no longer
there)
+ val change = TableChange.deleteColumn(Array("category"), false)
+ catalog("testcat").alterTable(ident, change)
+
+ // refresh table is supposed to trigger recaching
+ spark.sql(s"REFRESH TABLE $t")
+
+ // recaching is expected to succeed
+ assert(spark.sharedState.cacheManager.numCachedEntries == 1)
+
+ // verify cache reflects latest schema and data
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+ }
+ }
+
+ test("SPARK-54424: refresh table cache on schema changes (column added)") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10), (2, 20), (3, 30)")
+
+ // cache table
+ spark.table(t).cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t))
+ checkAnswer(
+ spark.table(t),
+ Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+ // evolve table directly to mimic external changes
+ // these external changes make cached plan invalid (table state has
changed)
+ val change = TableChange.addColumn(Array("category"), StringType, true)
+ catalog("testcat").alterTable(ident, change)
+
+ // refresh table is supposed to trigger recaching
+ spark.sql(s"REFRESH TABLE $t")
+
+ // recaching is expected to succeed
+ assert(spark.sharedState.cacheManager.numCachedEntries == 1)
+
+ // verify cache reflects latest schema and data
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, null), Row(2, 20, null),
Row(3, 30, null)))
+ }
+ }
+
+ test("SPARK-54424: successfully refresh cache with compatible schema
changes") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10), (2, 20), (3, 30)")
+
+ // cache query
+ val df = spark.table(t).filter("id < 100")
+ df.cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t).filter("id < 100"))
+ checkAnswer(
+ spark.table(t).filter("id < 100"),
+ Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+ // evolve table directly to mimic external changes
+ // adding columns should be OK
+ val change = TableChange.addColumn(Array("category"), StringType, true)
+ catalog("testcat").alterTable(ident, change)
+
+ // refresh table is supposed to trigger recaching
+ spark.sql(s"REFRESH TABLE $t")
+
+ // recaching is expected to succeed
+ assert(spark.sharedState.cacheManager.numCachedEntries == 1)
+
+ // verify derived queries still benefit from refreshed cache
+ assertCached(df.filter("id > 0"))
+ checkAnswer(df.filter("id > 0"), Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+ // add more data
+ sql(s"INSERT INTO $t VALUES (4, 40, '40')")
+
+ // verify derived queries still benefit from refreshed cache
+ assertCached(df.filter("id > 0"))
+ checkAnswer(df.filter("id > 0"), Seq(Row(1, 10), Row(2, 20), Row(3, 30),
Row(4, 40)))
+
+ // verify latest schema is propagated (new column has NULL values for
existing rows)
+ checkAnswer(
+ spark.table(t),
+ Seq(Row(1, 10, null), Row(2, 20, null), Row(3, 30, null), Row(4, 40,
"40")))
+ }
+ }
+
+ test("SPARK-54424: inability to refresh cache shouldn't fail operations") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10), (2, 20), (3, 30)")
+
+ // cache query
+ val df = spark.table(t).filter("id < 100")
+ df.cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t).filter("id < 100"))
+ checkAnswer(
+ spark.table(t).filter("id < 100"),
+ Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+
+ // evolve table directly to mimic external changes
+ // removing columns should be make cached plan invalid
+ val change = TableChange.deleteColumn(Array("value"), false)
+ catalog("testcat").alterTable(ident, change)
+
+ // refresh table is supposed to trigger recaching
+ spark.sql(s"REFRESH TABLE $t")
+
+ // recaching is expected to fail
+ assert(spark.sharedState.cacheManager.isEmpty)
+
+ // verify latest schema is propagated
+ checkAnswer(spark.table(t), Seq(Row(1), Row(2), Row(3)))
+ }
+ }
+
private def pinTable(catalogName: String, ident: Identifier, version:
String): Unit = {
catalog(catalogName) match {
case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident,
version)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]