This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 665a428cb6cd [SPARK-53905][SQL] Refactor RelationResolution to enable
code reuse
665a428cb6cd is described below
commit 665a428cb6cd14a4bd5bc895797180749d4ed09d
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Oct 29 11:34:25 2025 -0700
[SPARK-53905][SQL] Refactor RelationResolution to enable code reuse
### What changes were proposed in this pull request?
This PR refactor RelationResolution to enable code reuse.
### Why are the changes needed?
These changes are needed to simplify subsequent PRs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests in `PlanResolutionSuite` already cover plan ID cloning and
cache hits.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52781 from aokolnychyi/spark-53905.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/catalyst/analysis/RelationResolution.scala | 54 +++++++++++++---------
1 file changed, 31 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 9e9ad63b3a44..ea5836b8ec2d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.analysis
+import scala.collection.mutable
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -49,8 +51,13 @@ class RelationResolution(override val catalogManager:
CatalogManager)
with Logging
with LookupCatalog
with SQLConfHelper {
+
+ type CacheKey = (Seq[String], Option[TimeTravelSpec])
+
val v1SessionCatalog = catalogManager.v1SessionCatalog
+ private def relationCache: mutable.Map[CacheKey, LogicalPlan] =
AnalysisContext.get.relationCache
+
/**
* If we are resolving database objects (relations, functions, etc.) inside
views, we may need to
* expand single or multi-part identifiers with the current catalog and
namespace of when the
@@ -109,12 +116,9 @@ class RelationResolution(override val catalogManager:
CatalogManager)
}.orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
- val key =
- (
- (catalog.name +: ident.namespace :+
ident.name).toImmutableArraySeq,
- finalTimeTravelSpec
- )
- AnalysisContext.get.relationCache
+ val key = toCacheKey(catalog, ident, finalTimeTravelSpec)
+ val planId = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
+ relationCache
.get(key)
.map { cache =>
val cachedRelation = cache.transform {
@@ -123,13 +127,7 @@ class RelationResolution(override val catalogManager:
CatalogManager)
newRelation.copyTagsFrom(multi)
newRelation
}
- u.getTagValue(LogicalPlan.PLAN_ID_TAG)
- .map { planId =>
- val cachedConnectRelation = cachedRelation.clone()
- cachedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG,
planId)
- cachedConnectRelation
- }
- .getOrElse(cachedRelation)
+ cloneWithPlanId(cachedRelation, planId)
}
.orElse {
val writePrivilegesString =
@@ -144,16 +142,8 @@ class RelationResolution(override val catalogManager:
CatalogManager)
u.isStreaming,
finalTimeTravelSpec
)
- loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
- u.getTagValue(LogicalPlan.PLAN_ID_TAG)
- .map { planId =>
- loaded.map { loadedRelation =>
- val loadedConnectRelation = loadedRelation.clone()
- loadedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG,
planId)
- loadedConnectRelation
- }
- }
- .getOrElse(loaded)
+ loaded.foreach(relationCache.update(key, _))
+ loaded.map(cloneWithPlanId(_, planId))
}
case _ => None
}
@@ -263,4 +253,22 @@ class RelationResolution(override val catalogManager:
CatalogManager)
}
} else None
}
+
+ private def toCacheKey(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ timeTravelSpec: Option[TimeTravelSpec] = None): CacheKey = {
+ ((catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq,
timeTravelSpec)
+ }
+
+ private def cloneWithPlanId(plan: LogicalPlan, planId: Option[Long]):
LogicalPlan = {
+ planId match {
+ case Some(id) =>
+ val clone = plan.clone()
+ clone.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
+ clone
+ case None =>
+ plan
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]