This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6cc6e8ad4a2a [SPARK-56750] default path config
6cc6e8ad4a2a is described below
commit 6cc6e8ad4a2a5cd4a9a2e46ab46d5ae42643eb22
Author: Serge Rielau <[email protected]>
AuthorDate: Sun May 10 12:02:01 2026 -0700
[SPARK-56750] default path config
### What changes were proposed in this pull request?
This PR implements
**[SPARK-56750](https://issues.apache.org/jira/browse/SPARK-56750)** by adding
a default SQL PATH (parallel to the default catalog) and wiring path-driven
semantics through function and namespace resolution when
`spark.sql.path.enabled` is true.
#### Core feature
- **New config `spark.sql.defaultPath`** (`SQLConf.DEFAULT_PATH`):
- String session conf (default empty), accepts the full SET PATH grammar.
- When PATH is enabled and no explicit `SET PATH` was issued, this value
becomes the effective SQL PATH (mirroring how `currentCatalog` falls back to
`spark.sql.defaultCatalog`).
- `SET PATH = DEFAULT_PATH` expands to this configured default.
- An inner `DEFAULT_PATH` token inside the conf value resolves to the
Spark built-in default ordering (cycle-safe expansion).
- The conf is validated for syntax at set time; redundant entries are
tolerated at lookup time (first-match resolution makes them harmless).
- **Shared SET PATH grammar**: A new `singlePathElementList` rule and
`parsePathElements` parser entry let `spark.sql.defaultPath` reuse the exact
`SET PATH` grammar without re-implementation.
- **`CatalogManager` PATH materialization**: A single `sessionPathEntries`
resolves stored `SET PATH`, parsed `DEFAULT_PATH`, and the seeded
`defaultPathOrder` fallback into one effective path. `CURRENT_PATH()` and
unqualified function/relation resolution all read from this single source of
truth.
#### Path-driven function resolution
- **Fast-path kinds derived from the live PATH**: `SessionCatalog`'s
unqualified-lookup fast path receives an ordered list of `system.builtin` /
`system.session` kinds from the effective PATH (in path order) via a
path-driven provider. Replaces the previous reliance on
`spark.sql.functionResolution.sessionOrder` (which is now used solely as a seed
for `defaultPathOrder` and may eventually be retired).
- **The path is the path**: `current_path` is treated like
`current_catalog` and `current_schema` — we don't distinguish whether it was
set via `SET PATH`, via the `spark.sql.defaultPath` conf, or by the seeded
default. Whatever is on the path is searched as written. Placing
`system.session` after a user catalog is the user's authorization for
unqualified temp functions to resolve through that position.
- **Security check via path order**: Creating a temp function with a
builtin's name is blocked when `system.session` is searched before
`system.builtin`. The "no injection of temp over persistent" property is
enforced by path order itself: when a user catalog precedes `system.session`,
persistent resolution wins for unqualified names.
- **`COUNT(*) → COUNT(1)` rewrite gate**: The analyzer rewrite skips when a
temp `count` would shadow the builtin under the live PATH. Shares the same
canonical predicate (`FunctionResolution.isSessionBeforeBuiltinInPath`) as the
security check.
#### Layering
- Resolution-order logic lives at the Strategy layer
(`FunctionResolution`), not Coordination (`CatalogManager`). `CatalogManager`
provides path materialization (`sqlResolutionPathEntries`) and pure data
helpers (`systemFunctionKindsFromPath`) — it doesn't decide resolution order.
`SessionCatalog`'s default kinds provider derives from `conf.systemPathOrder`
(the seeded default path) and never branches on the legacy resolution-order
conf.
#### Deadlock-safe `SessionCatalog` lookups
- `lookupBuiltinOrTempTableFunction` and `lookupFunctionInfo` are
intentionally not synchronized on `SessionCatalog`. The path-driven kinds
provider may acquire `CatalogManager.synchronized`; another thread holding that
lock can call into `SessionCatalog` (e.g. via `setCurrentNamespace`), so
synchronizing the public lookup methods would introduce lock-order inversion.
Tighter `synchronized` blocks are kept around the temp-view-context
post-processing where they're actually needed.
#### Duplicate validation
- `SET PATH` rejects **static** duplicates (literal-vs-literal, repeated
`current_schema` / `current_database`) at statement time as a typo guard.
- A literal that happens to match the live `current_schema` is **not**
flagged: the duplicate is `USE SCHEMA`-state-dependent and harmless under
first-match resolution.
- `spark.sql.defaultPath` runs no semantic duplicate check at lookup time —
the previous lookup-time validate could wedge a session by throwing on every
unqualified resolution after `USE SCHEMA` collided a literal with
`current_schema`.
#### `DROP TEMPORARY FUNCTION` correctness
- Unqualified `DROP TEMPORARY FUNCTION <builtin>` without `IF EXISTS`
raises `FORBIDDEN_OPERATION` against the builtin name as captured.
- `IF EXISTS` remains a no-op when no temp function exists.
- Qualified temporary namespaces (`session.foo`, `system.session.foo`)
always target the temp registry.
#### `PathElement` AST
- New `PathElement` AST in catalyst
(`org.apache.spark.sql.connector.catalog.PathElement`, `private[sql]`) with
`expand` and `validateNoStaticDuplicates` helpers shared between
`SetPathCommand` and `CatalogManager.confDefaultPathEntries`.
#### Performance
- `CatalogManager.confDefaultPathEntries` caches expanded entries by
`(trimmed conf value, sessionFunctionResolutionOrder)`; on conf-stable sessions
the lookup hot path is one `AtomicReference.get`. `CurrentSchemaEntry` markers
are preserved unresolved so the cache stays valid across `USE SCHEMA`.
#### Tests
- `SetPathSuite` (47 tests) covers: lazy fallback, explicit override, `SET
PATH = DEFAULT_PATH`, cycle break, invalid value, PATH disabled, path-driven
security, user-only path, duplicate tolerance for `DEFAULT_PATH`, static
duplicate rejection at SET PATH, the explicit-interleaved path case
(`system.builtin, <user>, system.session` resolving a temp), and the
leading-system-kinds else-branch.
- `FunctionQualificationSuite` and `RelationQualificationSuite` continue to
pass unchanged (130/130 across the three suites).
### Why are the changes needed?
Users need an OSS way to define a default SQL PATH before any explicit `SET
PATH`, mirroring how `spark.sql.defaultCatalog` provides a session default for
the current catalog. The change also fixes a semantic gap where path-driven
function-resolution shortcuts (the `COUNT(*)` rewrite gate, the temp-vs-builtin
security check) ignored `SET PATH` and only consulted the legacy
`spark.sql.functionResolution.sessionOrder` proxy — so `SET PATH =
system.session, system.builtin, ...` did not b [...]
### Does this PR introduce *any* user-facing change?
**Yes.**
- New conf: **`spark.sql.defaultPath`**.
- With `spark.sql.path.enabled=true`, sessions without explicit `SET PATH`
resolve using the configured default path.
- `SET PATH = DEFAULT_PATH` and `CURRENT_PATH()` reflect the configured
default.
- Function resolution order for unqualified names follows the effective
PATH (post-`SET PATH`, with `DEFAULT_PATH` and `defaultPathOrder` fallbacks).
The temp-vs-builtin security check and the `COUNT(*) → COUNT(1)` rewrite gate
now reflect `SET PATH` changes.
- Subtle behavior change in the previously-impossible "temp-only function
in `sessionFunctionResolutionOrder=last`" case: the unqualified call now
resolves to the temp instead of `UNRESOLVED_ROUTINE`. This matches the
documented intent of "`builtin → persistent → session`". The "no temp injection
over persistent" property is preserved by path order.
### How was this patch tested?
- `build/sbt "sql/testOnly org.apache.spark.sql.SetPathSuite"`
- `build/sbt "sql/testOnly *FunctionQualificationSuite
*RelationQualificationSuite"`
- `build/sbt "sql/testOnly *DDLSuite -- -z \"drop built-in function\""`
- `build/sbt "catalyst/Compile/compile" "sql/Compile/compile"
"sql/Test/compile"`
### Was this patch authored or co-authored using generative AI tooling?
Yes. Some implementation and tests were iterated with coding-assistant
tooling; the author reviewed and owns the final patch.
Closes #55717 from srielau/SPARK-56750-default-path.
Authored-by: Serge Rielau <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../apache-spark-scala-lint-before-commit.mdc | 11 +
AGENTS.md | 2 +
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 4 +
.../spark/sql/catalyst/analysis/Analyzer.scala | 7 +-
.../sql/catalyst/analysis/FunctionResolution.scala | 16 ++
.../analysis/resolver/FunctionResolver.scala | 41 ++-
.../analysis/resolver/FunctionResolverUtils.scala | 19 +-
.../resolver/HigherOrderFunctionResolver.scala | 2 +-
.../analysis/resolver/ResolutionValidator.scala | 6 +
.../sql/catalyst/analysis/resolver/Resolver.scala | 12 +
.../sql/catalyst/catalog/SessionCatalog.scala | 100 ++++++--
.../sql/catalyst/parser/AbstractSqlParser.scala | 13 +
.../spark/sql/catalyst/parser/AstBuilder.scala | 22 +-
.../sql/connector/catalog/CatalogManager.scala | 111 ++++++--
.../spark/sql/connector/catalog/PathElement.scala | 141 +++++++++++
.../org/apache/spark/sql/internal/SQLConf.scala | 27 ++
.../spark/sql/execution/SparkSqlParser.scala | 10 +-
.../command/LeafRunnableCommandResolver.scala | 40 +++
.../sql/execution/command/SetPathCommand.scala | 81 +-----
.../spark/sql/execution/command/functions.scala | 10 +-
.../sql/internal/BaseSessionStateBuilder.scala | 7 +-
.../scala/org/apache/spark/sql/SetPathSuite.scala | 278 ++++++++++++++++++++-
.../spark/sql/hive/HiveSessionStateBuilder.scala | 2 +
23 files changed, 817 insertions(+), 145 deletions(-)
diff --git a/.cursor/rules/apache-spark-scala-lint-before-commit.mdc
b/.cursor/rules/apache-spark-scala-lint-before-commit.mdc
new file mode 100644
index 000000000000..f3a1d8f746e9
--- /dev/null
+++ b/.cursor/rules/apache-spark-scala-lint-before-commit.mdc
@@ -0,0 +1,11 @@
+---
+description: Run Spark Scala lint before any commit; never commit without
passing lint-scala
+alwaysApply: true
+---
+
+# Apache Spark: lint before commit
+
+- Before **any** `git commit` that changes Scala (or when preparing a commit
batch), run `./dev/lint-scala` from the repo root and fix all Scalastyle and
Scalafmt failures.
+- **Do not** tell the user a change is ready to commit or push until
`./dev/lint-scala` has passed in this environment (or the user has confirmed it
passed).
+- Common CI failures: **line length > 100** (wrap Scaladoc and code), import
order, spacing.
+- If `./dev/lint-scala` fixes files (e.g. scalafmt), include those fixes in
the same commit.
diff --git a/AGENTS.md b/AGENTS.md
index 96f5b7917cae..3361623f9f18 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -12,6 +12,8 @@ Before the first code edit or running test in a session,
ensure a clean working
- **New edits**: ask the user to choose: create a new git worktree from
`<upstream>/master` and work from there (recommended), or create and switch to
a new branch from `<upstream>/master`.
- **Running tests**: use `<upstream>/master`.
+5. **Commits:** Before committing or pushing changes that touch Scala (or when
unsure), run `./dev/lint-scala` from the repo root and fix all failures. Do not
consider a change merge-ready until lint passes; CI runs the same checks.
+
## Development Notes
SQL golden file tests are managed by `SQLQueryTestSuite` and its variants.
Read the class documentation before running or updating these tests. DO NOT
edit the generated golden files (`.sql.out`) directly. Always regenerate them
when needed, and carefully review the diff to make sure it's expected.
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 735921681cdc..89da0d4645c4 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -217,6 +217,10 @@ singleTableSchema
: colTypeList EOF
;
+singlePathElementList
+ : pathElement (COMMA pathElement)* EOF
+ ;
+
singleRoutineParamList
: colDefinitionList EOF
;
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 95b5c54f782f..f31354179674 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1976,14 +1976,15 @@ class Analyzer(
* This is used for special syntax transformations (e.g., COUNT(*) ->
COUNT(1)) that
* should only apply to builtin functions, not to user-defined functions.
*
- * In legacy mode (sessionOrder="first"), temp functions shadow builtins,
so an
- * unqualified name that matches a temp function should NOT be treated as
builtin.
+ * When the effective SQL PATH puts `system.session` before
`system.builtin`, temp
+ * functions shadow builtins, so an unqualified name that matches a temp
function
+ * should NOT be treated as builtin.
*/
private def matchesFunctionName(nameParts: Seq[String], expectedName:
String): Boolean = {
if (!FunctionResolution.isUnqualifiedOrBuiltinFunctionName(nameParts,
expectedName)) {
return false
}
- if (nameParts.size == 1 && conf.sessionFunctionResolutionOrder ==
"first") {
+ if (nameParts.size == 1 &&
functionResolution.isSessionBeforeBuiltinInPath) {
val v1Catalog = catalogManager.v1SessionCatalog
!v1Catalog.isTemporaryFunction(FunctionIdentifier(nameParts.head))
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index 4721425d2cb5..e3dbcc4b6ef7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -76,6 +76,22 @@ class FunctionResolution(
nameParts.length == 3 &&
nameParts.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME)
+ /**
+ * True iff `system.session` is searched before `system.builtin` in the
effective SQL PATH.
+ *
+ * Drives the `count(*) -> count(1)` rewrite (which must skip transformation
when a temp
+ * `count` shadows the builtin) and the `SessionCatalog` security check that
blocks creating
+ * a temp function with a builtin's name. Reads the live PATH via
`CatalogManager` and
+ * applies the same kinds extraction that drives `SessionCatalog`'s
fast-path provider, so
+ * the predicate stays in sync with the lookup loop's actual order.
+ */
+ def isSessionBeforeBuiltinInPath: Boolean = {
+ val path = catalogManager.sqlResolutionPathEntries(
+ catalogManager.currentCatalog.name(),
catalogManager.currentNamespace.toSeq)
+ CatalogManager.systemFunctionKindsFromPath(path).headOption
+ .contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
+ }
+
/**
* Produces the ordered list of candidate names for resolution. Expansion
happens in two cases:
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
index 1a8658bb764d..d411dffba0ea 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
@@ -20,10 +20,12 @@ package org.apache.spark.sql.catalyst.analysis.resolver
import scala.util.Random
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{
FunctionResolution,
UnresolvedFunction,
- UnresolvedSeed
+ UnresolvedSeed,
+ UnresolvedStar
}
import org.apache.spark.sql.catalyst.expressions.{
BinaryArithmetic,
@@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{
Expression,
ExpressionWithRandomSeed,
InheritAnalysisRules,
+ Literal,
ResolvedCollation,
TryEval,
UnresolvedCollation,
@@ -60,7 +63,7 @@ import org.apache.spark.sql.catalyst.util.CollationFactory
*/
class FunctionResolver(
expressionResolver: ExpressionResolver,
- functionResolution: FunctionResolution,
+ protected val functionResolution: FunctionResolution,
aggregateExpressionResolver: AggregateExpressionResolver,
binaryArithmeticResolver: BinaryArithmeticResolver)
extends TreeNodeResolver[UnresolvedFunction, Expression]
@@ -117,9 +120,10 @@ class FunctionResolver(
* - Apply timezone, if the resulting expression is
[[TimeZoneAwareExpression]].
*/
override def resolve(unresolvedFunction: UnresolvedFunction): Expression = {
+ val effectiveUnresolved =
restoreCountStarAfterParserForTempShadow(unresolvedFunction)
val expressionInfo = functionResolution.lookupBuiltinOrTempFunction(
- nameParts = unresolvedFunction.nameParts,
- unresolvedFunc = Some(unresolvedFunction)
+ nameParts = effectiveUnresolved.nameParts,
+ unresolvedFunc = Some(effectiveUnresolved)
)
val expressionResolutionContext = expressionResolutionContextStack.peek()
@@ -133,7 +137,7 @@ class FunctionResolver(
val functionWithResolvedChildren =
withResolvedChildren(
- unresolvedExpression = unresolvedFunction,
+ unresolvedExpression = effectiveUnresolved,
resolveChild = expressionResolver.resolve _
).asInstanceOf[UnresolvedFunction]
@@ -145,6 +149,33 @@ class FunctionResolver(
}
}
+ /**
+ * SQL parsing turns `COUNT(*)` into `COUNT(1)` before analysis
([[AstBuilder]]). When PATH
+ * orders session before builtin and a temporary `count` shadows the
builtin, that rewrite must
+ * be undone so arguments expand from the child operator output (aligning
with fixed-point
+ * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveReferences]] /
`matchesFunctionName`).
+ */
+ private def restoreCountStarAfterParserForTempShadow(
+ unresolvedFunction: UnresolvedFunction): UnresolvedFunction = {
+ if (unresolvedFunction.nameParts.length != 1 ||
+ unresolvedFunction.isDistinct ||
+ !unresolvedFunction.nameParts.head.equalsIgnoreCase("count")) {
+ return unresolvedFunction
+ }
+ unresolvedFunction.arguments match {
+ case Seq(lit: Literal)
+ if lit.value != null &&
+ lit.value == 1 &&
+ functionResolution.isSessionBeforeBuiltinInPath &&
+
functionResolution.catalogManager.v1SessionCatalog.isTemporaryFunction(
+ FunctionIdentifier(unresolvedFunction.nameParts.head)) =>
+ val expanded =
expressionResolver.expandStarExpressions(Seq(UnresolvedStar(None)))
+ unresolvedFunction.copy(arguments = expanded)
+ case _ =>
+ unresolvedFunction
+ }
+ }
+
private def handlePartiallyResolvedFunction(partiallyResolvedFunction:
Expression): Expression = {
val expressionResolutionContext = expressionResolutionContextStack.peek()
val resolvedFunction = partiallyResolvedFunction match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
index 503c94fc9cdf..3c5a3f1832e8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis.resolver
import java.util.Locale
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{
+ FunctionResolution,
ResolvedStar,
Star,
UnresolvedFunction,
@@ -35,6 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
*/
trait FunctionResolverUtils {
protected def expressionResolver: ExpressionResolver
+ protected def functionResolution: FunctionResolution
protected def conf: SQLConf
private val scopes = expressionResolver.getNameScopes
@@ -99,7 +102,21 @@ trait FunctionResolverUtils {
unresolvedFunction: UnresolvedFunction,
normalizeFunctionName: Boolean = true
): Boolean = {
- !unresolvedFunction.isDistinct && isCount(unresolvedFunction,
normalizeFunctionName)
+ !unresolvedFunction.isDistinct &&
+ isCount(unresolvedFunction, normalizeFunctionName) &&
+ !isUnqualifiedCountShadowedByTemp(unresolvedFunction)
+ }
+
+ /**
+ * Keep single-pass behavior aligned with fixed-point: when PATH puts
system.session before
+ * system.builtin and a temp `count` exists, unqualified `count(*)` must not
be rewritten to
+ * `count(1)`.
+ */
+ private def isUnqualifiedCountShadowedByTemp(unresolvedFunction:
UnresolvedFunction): Boolean = {
+ unresolvedFunction.nameParts.length == 1 &&
+ functionResolution.isSessionBeforeBuiltinInPath &&
+ functionResolution.catalogManager.v1SessionCatalog
+
.isTemporaryFunction(FunctionIdentifier(unresolvedFunction.nameParts.head))
}
private def isCount(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
index 6b90a5c05baf..676ef381f2f1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
*/
class HigherOrderFunctionResolver(
protected val expressionResolver: ExpressionResolver,
- functionResolution: FunctionResolution)
+ protected val functionResolution: FunctionResolution)
extends TreeNodeResolver[UnresolvedFunction, Expression]
with ProducesUnresolvedSubtree
with CoercesExpressionTypes
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
index 6bdf2d4b0615..793df7dc44a7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
@@ -107,6 +107,9 @@ class ResolutionValidator {
validateRepartitionByExpression(repartitionByExpression)
case sample: Sample =>
validateSample(sample)
+ case deserializeToObject: DeserializeToObject =>
+ validate(deserializeToObject.child)
+ handleOperatorOutput(deserializeToObject)
case generate: Generate =>
validateGenerate(generate)
case expand: Expand =>
@@ -118,6 +121,9 @@ class ResolutionValidator {
validateRelation(multiInstanceRelation)
case supervisingCommand: SupervisingCommand =>
validateSupervisingCommand(supervisingCommand)
+ case cmd: Command if cmd.children.isEmpty =>
+ // Session side-effect commands (e.g. SET PATH) are leaves with no
query subtree.
+ ()
}
operator match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
index aaf7117ef4e8..a811f5ce6d88 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
@@ -136,6 +136,15 @@ class Resolver(
*/
private val planRewriter = new PlanRewriter(planRewriteRules,
extendedRewriteRules)
+ /**
+ * Runs session post-resolution rules (see [[extendedRewriteRules]]) on a
subtree. Used for
+ * operators such as [[DeserializeToObject]] whose deserializer is resolved
by the same rules as
+ * the end-of-resolution [[PlanRewriter]] batch, but which must be applied
before single-pass
+ * validation asserts the operator is fully resolved.
+ */
+ private def applyExtendedRewriteRulesLocally(plan: LogicalPlan): LogicalPlan
=
+ extendedRewriteRules.foldLeft(plan) { case (p, rule) => rule(p) }
+
/**
* [[relationMetadataProvider]] is used to resolve metadata for relations.
It's initialized with
* the default implementation [[MetadataResolver]] here and is called in
@@ -325,6 +334,9 @@ class Resolver(
resolveRepartition(repartition)
case sample: Sample =>
resolveSample(sample)
+ case deserializeToObject: DeserializeToObject =>
+ applyExtendedRewriteRulesLocally(
+ deserializeToObject.copy(child =
resolve(deserializeToObject.child)))
case _ =>
tryDelegateResolutionToExtension(unresolvedPlan).getOrElse {
handleUnmatchedOperator(unresolvedPlan)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 32aa8cccbd93..249700ec0d92 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -113,18 +113,67 @@ class SessionCatalog(
identifier.copy(funcName = "") == SESSION_NAMESPACE_TEMPLATE
/**
- * Session function kinds in resolution order for unqualified lookups.
- * Matches [[SQLConf.sessionFunctionResolutionOrder]]: "first" (session
first),
- * "second" (default), "last" (builtin only; session tried after persistent).
+ * When set, unqualified builtin/temp function resolution uses this fixed
kind order instead of
+ * [[catalogManagerForSessionFunctionKinds]] / [[SQLConf.systemPathOrder]].
For unit tests only;
+ * production relies on the catalog manager binding.
*/
- private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind]
= {
- conf.sessionFunctionResolutionOrder match {
- case "first" => Seq(Temp, Builtin)
- case "last" => Seq(Builtin)
- case _ => Seq(Builtin, Temp) // "second" (default)
- }
+ @volatile private var sessionFunctionKindsTestOverride:
Option[Seq[SessionFunctionKind]] = None
+
+ /**
+ * Live PATH for session function kinds. Set from
+ * [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor
via
+ * [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and
the security check
+ * that blocks temp functions from shadowing builtins read the effective SQL
PATH (post-`SET
+ * PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]]
fallbacks already
+ * applied).
+ *
+ * When unset (e.g. standalone [[SessionCatalog]] in tests), kinds derive
from
+ * [[SQLConf.systemPathOrder]] -- the seeded default path -- without
assuming other legacy
+ * resolution-order conf beyond seeding `defaultPathOrder`.
+ */
+ @volatile private var catalogManagerForSessionFunctionKinds:
Option[CatalogManager] = None
+
+ /**
+ * Wire live PATH-derived session function kinds from the session
[[CatalogManager]].
+ * Called once from
[[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor.
+ */
+ private[sql] def bindCatalogManagerForSessionFunctionKinds(cm:
CatalogManager): Unit = {
+ catalogManagerForSessionFunctionKinds = Some(cm)
+ }
+
+ /**
+ * Pin session function kinds for tests (`None` clears). Uses `private[sql]`
so tests under the
+ * `org.apache.spark.sql` package can control ordering without a public
catalog API.
+ */
+ private[sql] def setSessionFunctionKindsTestOverride(
+ kinds: Option[Seq[SessionFunctionKind]]): Unit = {
+ sessionFunctionKindsTestOverride = kinds
}
+ /**
+ * Session function kinds in resolution order for unqualified lookups: test
override if set,
+ * else live PATH from [[catalogManagerForSessionFunctionKinds]], else
+ * [[SQLConf.systemPathOrder]].
+ */
+ private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] =
+ sessionFunctionKindsTestOverride.getOrElse {
+ catalogManagerForSessionFunctionKinds match {
+ case Some(cm) =>
+ CatalogManager.systemFunctionKindsFromPath(
+ cm.sqlResolutionPathEntries(cm.currentCatalog.name(),
cm.currentNamespace.toSeq))
+ case None =>
+ CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
+ }
+ }
+
+ /**
+ * True iff the effective SQL PATH searches `system.session` before
`system.builtin`. Used
+ * to gate the security check that blocks temporary functions from silently
shadowing a
+ * builtin of the same name.
+ */
+ private def sessionFirstInPath: Boolean =
+ sessionFunctionKindsInResolutionOrder.headOption.contains(Temp)
+
/**
* Checks if a namespace represents temporary functions.
*/
@@ -2080,12 +2129,11 @@ class SessionCatalog(
qualifyIdentifier(func)
}
- // Security check: When legacy mode is enabled, block SQL-created
temporary functions
- // from shadowing builtin functions (to preserve master behavior)
- // Scala UDFs are still allowed to shadow in legacy mode
- // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function
already exists
- val sessionFirst = conf.sessionFunctionResolutionOrder == "first"
- if (func.database.isEmpty && sessionFirst && !overrideIfExists) {
+ // Security check: when the effective SQL PATH searches `system.session`
before
+ // `system.builtin`, block creating an unqualified temporary function
whose name
+ // collides with a builtin so it cannot silently shadow that builtin via
unqualified
+ // resolution. We throw ROUTINE_ALREADY_EXISTS to indicate the conflict.
+ if (func.database.isEmpty && sessionFirstInPath && !overrideIfExists) {
val funcName = func.funcName
// Check if function exists in builtin namespace (extensions are stored
as builtins)
val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName)
@@ -2195,10 +2243,11 @@ class SessionCatalog(
// Use FunctionIdentifier with session namespace for temporary functions
val tempIdentifier = tempFunctionIdentifier(function.name.funcName)
- // Security check: When legacy mode is enabled, block SQL-created
temporary functions
- // from shadowing builtin functions (including extensions) as a safeguard
- // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function
already exists
- if ((conf.sessionFunctionResolutionOrder == "first") &&
!overrideIfExists) {
+ // Security check: when the effective SQL PATH searches `system.session`
before
+ // `system.builtin`, block creating an unqualified temporary function
whose name
+ // collides with a builtin (including extensions) so it cannot silently
shadow that
+ // builtin via unqualified resolution.
+ if (sessionFirstInPath && !overrideIfExists) {
val funcName = function.name.funcName
// Check if function exists in builtin namespace (extensions are
stored as builtins)
val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName)
@@ -2499,7 +2548,12 @@ class SessionCatalog(
* Look up the `ExpressionInfo` of the given function by name.
* Resolution order follows the configured path (e.g. builtin then session).
*/
- def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] =
synchronized {
+ def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] =
{
+ // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution
order may call
+ // into [[CatalogManager]] (e.g.
[[CatalogManager.sqlResolutionPathEntries]]), which can
+ // synchronize on the manager; another
+ // thread can hold that lock and call into this catalog (e.g. via
`setCurrentNamespace`),
+ // which would deadlock if this method also synchronized on `this`.
lookupFunctionWithShadowing(name, tableFunctionRegistry,
checkBuiltinOperators = false)
}
@@ -2650,7 +2704,11 @@ class SessionCatalog(
/**
* Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
+ def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
+ // Intentionally not `synchronized` on this [[SessionCatalog]] (see
+ // [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp
resolution uses
+ // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must
not run under
+ // this catalog's intrinsic lock.
if (name.database.isEmpty) {
lookupBuiltinOrTempFunction(name.funcName)
.orElse(lookupBuiltinOrTempTableFunction(name.funcName))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
index 216136d8a7c8..29bf924f244e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.{CompoundPlanStatement,
LogicalPlan}
import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.connector.catalog.PathElement
import org.apache.spark.sql.errors.QueryParsingErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -110,6 +111,18 @@ abstract class AbstractSqlParser extends AbstractParser
with ParserInterface {
}
}
+ /**
+ * Parse the right-hand side of `SET PATH = ...` (a comma-separated list of
path elements).
+ * Used by [[org.apache.spark.sql.connector.catalog.CatalogManager]] to
honor the
+ * [[SQLConf.DEFAULT_PATH]] conf without re-implementing the SET PATH
grammar.
+ */
+ private[sql] def parsePathElements(sqlText: String): Seq[PathElement] =
parse(sqlText) { parser =>
+ val ctx = parser.singlePathElementList()
+ withErrorHandling(ctx, Some(sqlText)) {
+ astBuilder.visitSinglePathElementList(ctx)
+ }
+ }
+
def withErrorHandling[T](ctx: ParserRuleContext, sqlText:
Option[String])(toResult: => T): T = {
withOrigin(ctx, sqlText) {
try {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 12446b7cc9d7..da232a99f19b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -47,7 +47,7 @@ import
org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory,
DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate,
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate,
stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo,
SupportsNamespaces, TableCatalog, TableWritePrivilege}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo,
PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange,
UnboundedRange, VersionRange}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
@@ -708,6 +708,26 @@ class AstBuilder extends DataTypeAstBuilder
visitMultipartIdentifier(ctx.multipartIdentifier)
}
+ override def visitSinglePathElementList(
+ ctx: SinglePathElementListContext): Seq[PathElement] = withOrigin(ctx) {
+ ctx.pathElement().asScala.map(visitPathElement).toSeq
+ }
+
+ override def visitPathElement(ctx: PathElementContext): PathElement =
withOrigin(ctx) {
+ if (ctx.DEFAULT_PATH() != null) PathElement.DefaultPath
+ else if (ctx.SYSTEM_PATH() != null) PathElement.SystemPath
+ else if (ctx.PATH() != null) PathElement.PathRef
+ else if (ctx.CURRENT_DATABASE() != null || ctx.CURRENT_SCHEMA() != null) {
+ PathElement.CurrentSchema
+ } else {
+ val parts = visitMultipartIdentifier(ctx.multipartIdentifier())
+ if (parts.length < 2) {
+ throw
QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString("."))
+ }
+ PathElement.SchemaInPath(parts)
+ }
+ }
+
override def visitSingleDataType(ctx: SingleDataTypeContext): DataType =
withOrigin(ctx) {
typedVisit[DataType](ctx.dataType)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index f73b0e68cecc..5f4c71297ac5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.connector.catalog
+import java.util.concurrent.atomic.AtomicReference
+
import scala.collection.mutable
import scala.util.Try
@@ -24,6 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -52,6 +55,12 @@ class CatalogManager(
// TODO: create a real SYSTEM catalog to host `TempVariableManager` under
the SESSION namespace.
val tempVariableManager: TempVariableManager = new TempVariableManager
+ // Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds
list itself is
+ // pure data conversion (system entries from the path, in path order); the
*decision* to use
+ // path-order kinds for unqualified lookups lives at the Strategy layer (see
callers of
+ // [[CatalogManager.systemFunctionKindsFromPath]]).
+ v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)
+
def catalog(name: String): CatalogPlugin = synchronized {
if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
v2SessionCatalog
@@ -138,8 +147,60 @@ class CatalogManager(
private var _sessionPath: Option[Seq[SessionPathEntry]] = None
- /** Returns the raw stored session path entries, or None if no path is set.
*/
- def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
_sessionPath }
+ /**
+ * Cache for [[confDefaultPathEntries]]: stores the expanded
[[SessionPathEntry]] list keyed
+ * on the trimmed [[SQLConf.DEFAULT_PATH]] string and
+ * [[SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER]] value (the only conf that
affects the
+ * expansion of `DEFAULT_PATH` / `SYSTEM_PATH` tokens).
+ * `CurrentSchemaEntry` markers are preserved unresolved so the cache stays
valid across
+ * `USE SCHEMA`.
+ */
+ private val confDefaultPathCache =
+ new AtomicReference[Option[(String, String, Seq[SessionPathEntry])]](None)
+
+ /**
+ * Returns the effective session path entries: the explicit `SET PATH` value
if stored,
+ * else the parsed [[SQLConf.DEFAULT_PATH]] conf if non-empty (mirroring how
+ * [[currentCatalog]] falls back to [[SQLConf.DEFAULT_CATALOG]]). Returns
`None` when
+ * [[SQLConf.PATH_ENABLED]] is false or both sources are empty.
+ */
+ def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
+ if (!conf.pathEnabled) None
+ else _sessionPath.orElse(confDefaultPathEntries)
+ }
+
+ /** Raw `_sessionPath` (post-`SET PATH`), without the
[[SQLConf.DEFAULT_PATH]] fallback. */
+ def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
_sessionPath }
+
+ /**
+ * Parsed and expanded [[SQLConf.DEFAULT_PATH]] value, or `None` when the
conf is empty.
+ * Reuses the SET PATH grammar via [[CatalystSqlParser.parsePathElements]].
An inner
+ * `DEFAULT_PATH` token resolves to the spark-builtin default ordering
(cycle break).
+ *
+ * Unlike `SET PATH`, this does NOT run a duplicate check: lookup uses
first-match
+ * resolution, so any redundant entry (including ones that only collide
after a later
+ * `USE SCHEMA`) is dead code rather than an error. Cached so the hot path
is a single
+ * atomic load on conf-stable sessions.
+ */
+ def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
+ val confValue = conf.defaultPath
+ if (confValue == null || confValue.trim.isEmpty) {
+ confDefaultPathCache.set(None)
+ None
+ } else {
+ val trimmed = confValue.trim
+ val sessionOrder = conf.sessionFunctionResolutionOrder
+ val expanded = confDefaultPathCache.get() match {
+ case Some((k, ord, cached)) if k == trimmed && ord == sessionOrder =>
cached
+ case _ =>
+ val elements = CatalystSqlParser.parsePathElements(trimmed)
+ val computed = PathElement.expand(elements, conf, this,
isConfDefaultExpansion = true)
+ confDefaultPathCache.set(Some((trimmed, sessionOrder, computed)))
+ computed
+ }
+ if (expanded.isEmpty) None else Some(expanded)
+ }
+ }
def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
_sessionPath = Some(entries)
@@ -150,18 +211,18 @@ class CatalogManager(
}
private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
synchronized {
- _sessionPath = other.sessionPathEntries
+ _sessionPath = other.storedSessionPathEntries
}
/**
* String form of the current resolution path for CURRENT_PATH().
- * When PATH is enabled and a session path is stored, formats the effective
path entries
- * with markers expanded. Otherwise falls back to the legacy
resolutionSearchPath.
+ * When PATH is enabled and a session path is in effect (stored or via
+ * [[SQLConf.DEFAULT_PATH]]), formats the resolved entries. Otherwise falls
back to the legacy
+ * resolutionSearchPath.
*/
def currentPathString: String = synchronized {
import CatalogV2Implicits._
- val stored = if (conf.pathEnabled) _sessionPath else None
- stored match {
+ sessionPathEntries match {
case Some(entries) =>
val resolved = CatalogManager.resolvePathEntries(
entries, currentCatalog.name(), currentNamespace.toSeq)
@@ -175,7 +236,8 @@ class CatalogManager(
/**
* Ordered catalog/schema path entries for resolving unqualified SQL object
names.
* When PATH is off or unset, applies [[SQLConf.defaultPathOrder]] (legacy).
- * When PATH is explicitly set, uses the resolved stored path entries.
+ * When PATH is in effect (stored or via the [[SQLConf.DEFAULT_PATH]] conf),
uses the
+ * resolved entries.
*/
def sqlResolutionPathEntries(
pathDefaultCatalog: String,
@@ -185,8 +247,7 @@ class CatalogManager(
val defaultEntry =
if (pathDefaultNamespace.isEmpty) Seq(pathDefaultCatalog)
else pathDefaultCatalog +: pathDefaultNamespace
- val stored = if (conf.pathEnabled) _sessionPath else None
- stored match {
+ sessionPathEntries match {
case Some(entries) =>
CatalogManager.resolvePathEntries(entries, expandCatalog,
expandNamespace)
case None =>
@@ -209,11 +270,11 @@ class CatalogManager(
* [[org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog]] /
`lookupBuiltinOrTempFunction`,
* not loadable via [[catalog]]), so `currentCatalog.name()` cannot be
`"system"`. If that
* invariant ever changes, this short-circuit must be revisited.
- * Inspecting stored entries directly avoids loading the configured default
catalog.
+ * Inspecting effective entries directly avoids loading the configured
default catalog.
*/
def isSystemSessionOnPath: Boolean = synchronized {
if (!conf.pathEnabled) return true
- _sessionPath match {
+ sessionPathEntries match {
case None => true
case Some(entries) => entries.exists {
case CatalogManager.LiteralPathEntry(parts) =>
@@ -289,6 +350,7 @@ class CatalogManager(
_currentNamespace = None
_currentCatalogName = None
_sessionPath = None
+ confDefaultPathCache.set(None)
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}
@@ -340,9 +402,30 @@ private[sql] object CatalogManager extends Logging {
isFullyQualifiedSystemSessionViewName(nameParts)
}
- /** True if a SQL path entry is the well-known `system.session` entry. */
+ /** True if a SQL path entry is the well-known `system.session` entry
(case-insensitive). */
def isSystemSessionPathEntry(parts: Seq[String]): Boolean =
- parts == Seq(SYSTEM_CATALOG_NAME, SESSION_NAMESPACE)
+ parts.length == 2 &&
+ parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
+ parts(1).equalsIgnoreCase(SESSION_NAMESPACE)
+
+ /** True if a SQL path entry is the well-known `system.builtin` entry
(case-insensitive). */
+ def isSystemBuiltinPathEntry(parts: Seq[String]): Boolean =
+ parts.length == 2 &&
+ parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
+ parts(1).equalsIgnoreCase(BUILTIN_NAMESPACE)
+
+ /**
+ * Extract `system.builtin` / `system.session` entries from a resolved PATH,
mapped to
+ * [[SessionCatalog.SessionFunctionKind]] in path order. Pure data
conversion -- callers
+ * decide whether and how to use this list.
+ */
+ def systemFunctionKindsFromPath(
+ path: Seq[Seq[String]]): Seq[SessionCatalog.SessionFunctionKind] =
+ path.flatMap { e =>
+ if (isSystemBuiltinPathEntry(e)) Some(SessionCatalog.Builtin)
+ else if (isSystemSessionPathEntry(e)) Some(SessionCatalog.Temp)
+ else None
+ }
/**
* A single entry in the session SQL path: either a literal schema
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
new file mode 100644
index 000000000000..ad5cf0e161ea
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util.Locale
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.CatalogManager.{
+ CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry
+}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * One element on the right-hand side of `SET PATH = ...`: either a well-known
shortcut
+ * keyword (DEFAULT_PATH, SYSTEM_PATH, PATH, CURRENT_SCHEMA /
CURRENT_DATABASE) or a
+ * fully qualified schema reference (`catalog.namespace...` with at least 2
parts).
+ *
+ * The same grammar is reused to parse the [[SQLConf.DEFAULT_PATH]] conf
value, so this
+ * AST node lives in catalyst beside [[CatalogManager]] rather than in the
runtime
+ * [[org.apache.spark.sql.execution.command.SetPathCommand]].
+ */
+private[sql] sealed trait PathElement
+
+private[sql] object PathElement {
+ case object DefaultPath extends PathElement
+ case object SystemPath extends PathElement
+ case object PathRef extends PathElement
+
+ /**
+ * Current database/schema (SQL aliases). Stored as the
[[CurrentSchemaEntry]] marker
+ * so resolution candidates expand against the live `USE SCHEMA`.
+ */
+ case object CurrentSchema extends PathElement
+
+ /** Fully qualified schema reference (`catalog.namespace...`). Must have at
least 2 parts. */
+ case class SchemaInPath(parts: Seq[String]) extends PathElement
+
+ /**
+ * Expand a parsed [[PathElement]] list into concrete [[SessionPathEntry]]
entries
+ * suitable for storing in [[CatalogManager._sessionPath]] or returning from
+ * [[CatalogManager.sessionPathEntries]].
+ *
+ * @param isConfDefaultExpansion when true, an inner [[DefaultPath]] token
resolves
+ * to the spark-builtin default ordering
(cycle break)
+ * rather than reading
[[SQLConf.DEFAULT_PATH]] again.
+ * Set to true when this method is invoked
while
+ * parsing [[SQLConf.DEFAULT_PATH]] itself.
+ */
+ def expand(
+ elements: Seq[PathElement],
+ conf: SQLConf,
+ catalogManager: CatalogManager,
+ isConfDefaultExpansion: Boolean = false): Seq[SessionPathEntry] = {
+ val currentSchemaSentinel = Seq("__current_schema__")
+
+ def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map {
+ case p if p == currentSchemaSentinel => CurrentSchemaEntry
+ case p => LiteralPathEntry(p)
+ }
+
+ def builtinDefaultWithCurrentSchema: Seq[SessionPathEntry] =
+ toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel)))
+
+ def defaultPathExpansion: Seq[SessionPathEntry] = {
+ if (isConfDefaultExpansion) {
+ // Cycle break: inner DEFAULT_PATH inside the conf default value falls
back to the
+ // spark-builtin default ordering instead of recursing.
+ builtinDefaultWithCurrentSchema
+ } else {
+
catalogManager.confDefaultPathEntries.getOrElse(builtinDefaultWithCurrentSchema)
+ }
+ }
+
+ elements.flatMap {
+ case DefaultPath =>
+ defaultPathExpansion
+ case SystemPath =>
+ toEntries(conf.systemPathOrder)
+ case CurrentSchema =>
+ Seq(CurrentSchemaEntry)
+ case PathRef =>
+ catalogManager.storedSessionPathEntries.getOrElse(defaultPathExpansion)
+ case SchemaInPath(parts) =>
+ Seq(LiteralPathEntry(parts))
+ }
+ }
+
+ /**
+ * Reject *static* duplicates in a SET PATH entry list: identical
[[LiteralPathEntry]] parts
+ * and repeated [[CurrentSchemaEntry]] markers (the `current_schema` /
`current_database`
+ * cross-alias case). Used for the interactive `SET PATH` form to surface
user typos at
+ * statement time.
+ *
+ * Deliberately does NOT compare a [[LiteralPathEntry]] against a
[[CurrentSchemaEntry]]:
+ * such a "duplicate" depends on the live `USE SCHEMA` and is harmless at
lookup (first-match
+ * resolution skips the dead literal). [[SQLConf.DEFAULT_PATH]] expansion
skips this check
+ * entirely so transient `USE`-induced collisions don't wedge unqualified
resolution.
+ */
+ def validateNoStaticDuplicates(
+ entries: Seq[SessionPathEntry],
+ caseSensitive: Boolean): Seq[SessionPathEntry] = {
+ val seenLiterals = new mutable.HashSet[Seq[String]]
+ var seenCurrentSchema = false
+ entries.foreach {
+ case CurrentSchemaEntry =>
+ if (seenCurrentSchema) {
+ throw new AnalysisException(
+ errorClass = "DUPLICATE_SQL_PATH_ENTRY",
+ messageParameters = Map("pathEntry" -> "current_schema"))
+ }
+ seenCurrentSchema = true
+ case LiteralPathEntry(parts) =>
+ val key = if (caseSensitive) parts else
parts.map(_.toLowerCase(Locale.ROOT))
+ if (!seenLiterals.add(key)) {
+ throw new AnalysisException(
+ errorClass = "DUPLICATE_SQL_PATH_ENTRY",
+ messageParameters = Map(
+ "pathEntry" ->
+ parts.map(p => if (p.contains(".")) s"`$p`" else
p).mkString(".")))
+ }
+ }
+ entries
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 46574b2f489b..f9144378c1f3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -43,9 +43,11 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
+import org.apache.spark.sql.connector.catalog.PathElement.PathRef
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
import org.apache.spark.storage.{StorageLevel, StorageLevelMapper}
@@ -2471,6 +2473,29 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val DEFAULT_PATH =
+ buildConf("spark.sql.defaultPath")
+ .version("4.2.0")
+ .doc("Default SQL PATH used when no SET PATH has been issued in the
session; this is " +
+ "also the value to which `SET PATH = DEFAULT_PATH` expands. Accepts
the full SET PATH " +
+ "grammar; an inner DEFAULT_PATH token resolves to the spark-builtin
default ordering. " +
+ "The PATH keyword is not allowed in this conf value. " +
+ "When empty, the spark-builtin default ordering controlled by " +
+ "`spark.sql.functionResolution.sessionOrder` applies. Validated for
syntax at set time; " +
+ "redundant entries are tolerated (lookup uses first-match resolution).
The interactive " +
+ "SET PATH form still rejects static duplicates as a typo guard.")
+ .withBindingPolicy(ConfigBindingPolicy.SESSION)
+ .stringConf
+ .checkValue(
+ v =>
+ v == null || v.trim.isEmpty ||
+ Try(CatalystSqlParser.parsePathElements(v.trim))
+ .toOption
+ .exists(!_.contains(PathRef)),
+ "The value must be empty or a comma-separated SET PATH element list " +
+ "(same grammar as SET PATH, except PATH is not allowed).")
+ .createWithDefault("")
+
// Whether to retain group by columns or not in GroupedData.agg.
val DATAFRAME_RETAIN_GROUP_COLUMNS =
buildConf("spark.sql.retainGroupColumns")
.version("1.4.0")
@@ -8555,6 +8580,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED)
+ def defaultPath: String = getConf(SQLConf.DEFAULT_PATH)
+
/**
* Returns the resolution search path for error messages and resolution
order.
* This is the single source of truth for the search path used for
functions, tables, and views.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 8f4c77840f0c..b4ece7329094 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -359,15 +359,7 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitSetPath(ctx: SetPathContext): LogicalPlan =
withOrigin(ctx) {
- val elements = ctx.pathElement().asScala.map { pe =>
- if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath
- else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath
- else if (pe.PATH() != null) PathElement.PathRef
- else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase
- else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema
- else
PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier()))
- }.toSeq
- SetPathCommand(elements)
+ SetPathCommand(ctx.pathElement().asScala.map(visitPathElement).toSeq)
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/LeafRunnableCommandResolver.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/LeafRunnableCommandResolver.scala
new file mode 100644
index 000000000000..07ac4afd432b
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/LeafRunnableCommandResolver.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.catalyst.analysis.resolver.{
+ LogicalPlanResolver,
+ ResolverExtension
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * [[ResolverExtension]] so the single-pass analyzer accepts
[[LeafRunnableCommand]] plans.
+ * These commands have no logical-plan children ([[RunnableCommand]] reports
an empty child
+ * list); nested SQL or expressions are parsed and analyzed inside
[[RunnableCommand.run]].
+ */
+private[sql] class LeafRunnableCommandResolver extends ResolverExtension {
+ override def resolveOperator(
+ operator: LogicalPlan,
+ resolver: LogicalPlanResolver): Option[LogicalPlan] = operator match {
+ case cmd: LeafRunnableCommand =>
+ Some(cmd)
+ case _ =>
+ None
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
index 70538160eefd..82ab46ec9b14 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
@@ -17,40 +17,17 @@
package org.apache.spark.sql.execution.command
-import java.util.Locale
-
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.connector.catalog.CatalogManager.{
- CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry
-}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.connector.catalog.PathElement
import org.apache.spark.sql.internal.SQLConf
-/**
- * Path element for SET PATH: either a well-known shortcut or a fully
qualified schema reference.
- * SchemaInPath requires at least 2 parts (catalog.namespace); multi-level
namespaces are allowed.
- */
-sealed trait PathElement
-
-object PathElement {
- case object DefaultPath extends PathElement
- case object SystemPath extends PathElement
- case object PathRef extends PathElement
- /**
- * Current database/schema (SQL aliases). Stored as system.current_schema;
expands when
- * building resolution candidates so later USE SCHEMA is reflected.
- */
- case object CurrentDatabase extends PathElement
- case object CurrentSchema extends PathElement
- /** Fully qualified schema reference (catalog.namespace...). Must have at
least 2 parts. */
- case class SchemaInPath(parts: Seq[String]) extends PathElement
-}
-
/**
* Command for SET PATH = pathElement (, pathElement)*
* Expands shortcuts at run time, validates no duplicates, and sets the
internal session path.
+ *
+ * The [[PathElement]] AST and its expansion live in catalyst so that the same
grammar can be
+ * reused to parse the [[SQLConf.DEFAULT_PATH]] conf value.
*/
case class SetPathCommand(elements: Seq[PathElement]) extends
LeafRunnableCommand {
@@ -64,23 +41,9 @@ case class SetPathCommand(elements: Seq[PathElement])
extends LeafRunnableComman
}
val conf = sparkSession.sessionState.conf
val catalogManager = sparkSession.sessionState.catalogManager
- val currentCatalog = catalogManager.currentCatalog.name
- val currentNamespace = catalogManager.currentNamespace.toSeq
- val caseSensitive = conf.caseSensitiveAnalysis
- val expanded = expandPathElements(elements, conf, catalogManager)
- val seen = new scala.collection.mutable.HashSet[Seq[String]]
- expanded.foreach { entry =>
- val concrete = entry.resolve(currentCatalog, currentNamespace)
- def normalize(s: String): String = if (caseSensitive) s else
s.toLowerCase(Locale.ROOT)
- val key = concrete.map(normalize)
- if (!seen.add(key)) {
- throw new AnalysisException(
- errorClass = "DUPLICATE_SQL_PATH_ENTRY",
- messageParameters = Map("pathEntry" ->
- concrete.map(p => if (p.contains(".")) s"`$p`" else
p).mkString(".")))
- }
- }
+ val expanded0 = PathElement.expand(elements, conf, catalogManager)
+ val expanded = PathElement.validateNoStaticDuplicates(expanded0,
conf.caseSensitiveAnalysis)
if (expanded.isEmpty) {
catalogManager.clearSessionPath()
@@ -89,36 +52,4 @@ case class SetPathCommand(elements: Seq[PathElement])
extends LeafRunnableComman
}
Seq.empty
}
-
- private def expandPathElements(
- elements: Seq[PathElement],
- conf: SQLConf,
- catalogManager: CatalogManager): Seq[SessionPathEntry] = {
- val currentSchemaSentinel = Seq("__current_schema__")
-
- def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map {
- case p if p == currentSchemaSentinel => CurrentSchemaEntry
- case p => LiteralPathEntry(p)
- }
-
- def defaultWithCurrentSchema: Seq[SessionPathEntry] =
- toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel)))
-
- elements.flatMap {
- case PathElement.DefaultPath =>
- defaultWithCurrentSchema
- case PathElement.SystemPath =>
- toEntries(conf.systemPathOrder)
- case PathElement.CurrentDatabase | PathElement.CurrentSchema =>
- Seq(CurrentSchemaEntry)
- case PathElement.PathRef =>
- catalogManager.sessionPathEntries.getOrElse(defaultWithCurrentSchema)
- case PathElement.SchemaInPath(parts) =>
- if (parts.length < 2) {
- throw
QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString("."))
- }
- Seq(LiteralPathEntry(parts))
- }
- }
-
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 5929e5c56f90..ff54b49eed3a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -167,9 +167,13 @@ case class DropFunctionCommand(
identifier.funcName
}
- // Check if temp function exists first - if it does, allow dropping it
even if a builtin
- // with the same name exists (shadowing case)
- if (!catalog.isTemporaryFunction(FunctionIdentifier(funcName)) &&
+ // Keep DROP TEMPORARY FUNCTION semantics consistent for unqualified
names:
+ // - builtin name, no temp present, no IF EXISTS => FORBIDDEN_OPERATION
+ // - IF EXISTS => no-op
+ // Qualified temp namespaces (session / system.session) always target
temp functions.
+ if (identifier.database.isEmpty &&
+ !ifExists &&
+ !catalog.isTemporaryFunction(FunctionIdentifier(funcName)) &&
catalog.isBuiltinFunction(funcName)) {
throw QueryCompilationErrors.cannotDropBuiltinFuncError(funcName)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index cc8c9dcb71f5..d2bac612c774 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{ColumnarRule,
CommandExecutionMode, Query
import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder
import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg,
ScalaUDAF}
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
-import org.apache.spark.sql.execution.command.{CheckViewReferences,
CommandCheck}
+import org.apache.spark.sql.execution.command.{CheckViewReferences,
CommandCheck, LeafRunnableCommandResolver}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck,
V2SessionCatalog}
import org.apache.spark.sql.execution.streaming.runtime.ResolveWriteToStream
@@ -194,7 +194,8 @@ abstract class BaseSessionStateBuilder(
customHintResolutionRules
override val singlePassResolverExtensions: Seq[ResolverExtension] = Seq(
- new LogicalRelationResolver
+ new LogicalRelationResolver,
+ new LeafRunnableCommandResolver
)
override val singlePassMetadataResolverExtensions: Seq[ResolverExtension]
= Seq(
@@ -205,6 +206,8 @@ abstract class BaseSessionStateBuilder(
override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
ApplyCharTypePadding +:
+ ResolveSQLFunctions +:
+ ResolveDeserializer +:
singlePassCustomPostHocResolutionRules
override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit]
= {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index ad6bcd414966..f642393ea139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -211,7 +212,7 @@ class SetPathSuite extends SharedSparkSession {
},
condition = "DUPLICATE_SQL_PATH_ENTRY",
sqlState = Some("42732"),
- parameters = Map("pathEntry" -> "spark_catalog.default"))
+ parameters = Map("pathEntry" -> "current_schema"))
}
}
@@ -231,16 +232,17 @@ class SetPathSuite extends SharedSparkSession {
}
}
- test("PATH enabled: duplicate after expanding CURRENT_SCHEMA") {
+ test("PATH enabled: literal + CURRENT_SCHEMA collision is tolerated
(USE-state dependent)") {
+ // SET PATH only rejects static duplicates (literal-vs-literal,
current_schema repeated).
+ // A literal that happens to match the live current_schema is not flagged:
a later
+ // `USE SCHEMA` may make them diverge, and at lookup the first match wins
anyway.
+ // `system.builtin` is included so `current_path()` itself remains
resolvable.
withPathEnabled {
sql("USE spark_catalog.default")
- checkError(
- exception = intercept[AnalysisException] {
- sql("SET PATH = spark_catalog.default, current_schema")
- },
- condition = "DUPLICATE_SQL_PATH_ENTRY",
- sqlState = Some("42732"),
- parameters = Map("pathEntry" -> "spark_catalog.default"))
+ sql("SET PATH = spark_catalog.default, current_schema, system.builtin")
+ val entries = pathEntries(currentPath())
+ assert(entries === Seq("spark_catalog.default", "spark_catalog.default",
"system.builtin"),
+ s"Expected literal + resolved CURRENT_SCHEMA preserved; got: $entries")
}
}
@@ -253,7 +255,7 @@ class SetPathSuite extends SharedSparkSession {
},
condition = "DUPLICATE_SQL_PATH_ENTRY",
sqlState = Some("42732"),
- parameters = Map("pathEntry" -> "spark_catalog.default"))
+ parameters = Map("pathEntry" -> "current_schema"))
}
}
@@ -568,4 +570,260 @@ class SetPathSuite extends SharedSparkSession {
}
}
}
+
+ // --- spark.sql.defaultPath (SQLConf.DEFAULT_PATH) ---
+ // The conf carries the SET PATH grammar; sessionPathEntries falls back to
it lazily
+ // when no `SET PATH` has been issued, mirroring how `currentCatalog` falls
back to
+ // [[SQLConf.DEFAULT_CATALOG]].
+
+ test("DEFAULT_PATH conf: lazy fallback when no SET PATH issued") {
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.DEFAULT_PATH.key -> "spark_catalog.default, system.builtin") {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ catalogManager.clearSessionPath()
+ try {
+ val entries = pathEntries(currentPath())
+ assert(entries == Seq("spark_catalog.default", "system.builtin"),
+ s"Expected DEFAULT_PATH conf to drive current_path(); got: $entries")
+ assert(catalogManager.storedSessionPathEntries.isEmpty,
+ "DEFAULT_PATH lookup must not write to the in-memory stored session
path")
+ } finally {
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
+
+ test("DEFAULT_PATH conf: explicit SET PATH overrides the conf") {
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.DEFAULT_PATH.key -> "system.builtin, system.session") {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ try {
+ sql("SET PATH = system.session, system.builtin")
+ val entries = pathEntries(currentPath())
+ assert(entries == Seq("system.session", "system.builtin"),
+ s"Expected SET PATH to win over DEFAULT_PATH conf; got: $entries")
+ } finally {
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
+
+ test("DEFAULT_PATH conf: SET PATH = DEFAULT_PATH expands to the conf value")
{
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin,
current_schema") {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ try {
+ sql("SET PATH = DEFAULT_PATH")
+ val entries = pathEntries(currentPath())
+ assert(entries.head.contains("system.session"),
+ s"DEFAULT_PATH expansion should follow conf order (session first);
got: $entries")
+ assert(catalogManager.storedSessionPathEntries.isDefined,
+ "After SET PATH the in-memory stored session path should be
populated")
+ } finally {
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
+
+ test("DEFAULT_PATH conf: cycle break -- inner DEFAULT_PATH falls back to
builtin order") {
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.DEFAULT_PATH.key -> "DEFAULT_PATH",
+ // Pin order conf to "first" so the spark-builtin default ordering is
observable.
+ SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "first") {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ catalogManager.clearSessionPath()
+ try {
+ val entries = pathEntries(currentPath())
+ assert(entries.head.contains("system.session"),
+ s"Inner DEFAULT_PATH should resolve to builtin order seeded by the
order conf " +
+ s"('first' -> session leading); got: $entries")
+ } finally {
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
+
+ test("DEFAULT_PATH conf: invalid value rejected on SET
spark.sql.defaultPath") {
+ withPathEnabled {
+ val e = intercept[SparkIllegalArgumentException] {
+ sql("SET spark.sql.defaultPath = this is not a path")
+ }
+ assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage)
+ }
+ }
+
+ test("DEFAULT_PATH conf: PATH keyword is rejected on SET
spark.sql.defaultPath") {
+ withPathEnabled {
+ val e = intercept[SparkIllegalArgumentException] {
+ sql("SET spark.sql.defaultPath = PATH, system.builtin")
+ }
+ assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage)
+ }
+ }
+
+ test("DEFAULT_PATH conf: PATH disabled returns no fallback") {
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "false",
+ SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") {
+ val catalogManager = spark.sessionState.catalogManager
+ assert(catalogManager.sessionPathEntries.isEmpty,
+ "DEFAULT_PATH conf must not take effect when PATH is disabled")
+ }
+ }
+
+ // --- Path-driven security check (built on the lazy DEFAULT_PATH fallback)
---
+ // The "block temp function shadowing builtin" check is now driven by the
live PATH, so
+ // changes via SET PATH or DEFAULT_PATH take effect even when the legacy
order conf is
+ // left at its default.
+
+ test("path-driven security check: SET PATH putting session before builtin
blocks temp " +
+ "function with a builtin name") {
+ withPathEnabled {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ try {
+ // Default `sessionFunctionResolutionOrder` is "second" (builtin
first), but SET PATH
+ // overrides that to put session first. The security check must
reflect the live path.
+ sql("SET PATH = system.session, system.builtin")
+ val e = intercept[AnalysisException] {
+ sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1")
+ }
+ assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage)
+ } finally {
+ sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
+
+ test("path-driven security check: DEFAULT_PATH conf putting session before
builtin " +
+ "blocks temp function with a builtin name (no SET PATH issued)") {
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ catalogManager.clearSessionPath()
+ try {
+ // Order conf is left at its default ("second"). The path-driven gate
must read
+ // DEFAULT_PATH and fire the security check for unqualified
temp/builtin collisions.
+ val e = intercept[AnalysisException] {
+ sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1")
+ }
+ assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage)
+ } finally {
+ sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
+
+ test("PATH enabled: SET PATH with only user schemas does not implicitly
resolve builtins") {
+ withPathEnabled {
+ sql("CREATE SCHEMA IF NOT EXISTS only_user_on_path")
+ try {
+ sql("SET PATH = spark_catalog.only_user_on_path")
+ val e = intercept[AnalysisException] {
+ sql("SELECT abs(-1)").collect()
+ }
+ assert(e.getCondition == "UNRESOLVED_ROUTINE", e.getMessage)
+ } finally {
+ sql("DROP SCHEMA IF EXISTS only_user_on_path")
+ }
+ }
+ }
+
+ test("single-pass: count(*) rewrite respects PATH temp-before-builtin gate")
{
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key -> "true") {
+ sql("SET PATH = system.builtin, system.session")
+ sql("CREATE TEMPORARY FUNCTION count(x INT) RETURNS INT RETURN x")
+ try {
+ sql("SET PATH = system.session, system.builtin")
+ checkAnswer(
+ sql("SELECT count(*) FROM VALUES (CAST(NULL AS INT)) AS t(c)"),
+ Row(null))
+ } finally {
+ sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
+ }
+ }
+ }
+
+ test("PATH enabled: explicit SET PATH with system.session AFTER a user
catalog still " +
+ "reaches temp functions") {
+ // Explicit paths are honored as written: placing `system.session` after a
user catalog
+ // is the user's authorization for unqualified temp functions to resolve.
Contrast with
+ // the implicit (no SET PATH, no DEFAULT_PATH) form, which preserves the
security property
+ // of the seeded default path.
+ withPathEnabled {
+ sql("CREATE SCHEMA IF NOT EXISTS path_interleaved_user")
+ try {
+ sql("CREATE TEMPORARY FUNCTION path_interleaved_temp() RETURNS INT
RETURN 7")
+ try {
+ sql("SET PATH = system.builtin, spark_catalog.path_interleaved_user,
system.session")
+ checkAnswer(sql("SELECT path_interleaved_temp()"), Row(7))
+ } finally {
+ sql("DROP TEMPORARY FUNCTION IF EXISTS path_interleaved_temp")
+ }
+ } finally {
+ sql("DROP SCHEMA IF EXISTS path_interleaved_user")
+ }
+ }
+ }
+
+ test("PATH enabled: SET PATH with user schema before system.builtin still
resolves builtins") {
+ // Exercises systemFunctionKindsFromPath with a user-catalog entry
preceding
+ // system.builtin: the helper flat-scans the path, so Builtin still appears
+ // in the kinds list and unqualified `abs` resolves.
+ withPathEnabled {
+ sql("CREATE SCHEMA IF NOT EXISTS path_user_before_builtin")
+ try {
+ sql("SET PATH = spark_catalog.path_user_before_builtin,
system.builtin")
+ // `abs` is a builtin; if Builtin did not appear in the kinds list,
+ // unqualified `abs(-1)` would fail with UNRESOLVED_ROUTINE.
+ checkAnswer(sql("SELECT abs(-1)"), Row(1))
+ } finally {
+ sql("DROP SCHEMA IF EXISTS path_user_before_builtin")
+ }
+ }
+ }
+
+ test("DEFAULT_PATH conf: duplicate entries are tolerated (first-match
resolution)") {
+ // Lookup uses first-match resolution, so redundant entries on
DEFAULT_PATH are dead code
+ // rather than an error. (Contrast with SET PATH, which still rejects
static duplicates as
+ // a user-input typo guard.) This avoids a UX cliff where a USE SCHEMA
could later wedge
+ // every unqualified function lookup with DUPLICATE_SQL_PATH_ENTRY.
+ withSQLConf(
+ SQLConf.PATH_ENABLED.key -> "true",
+ SQLConf.DEFAULT_PATH.key -> "system.builtin, system.builtin") {
+ val catalogManager = spark.sessionState.catalogManager
+ val priorSessionPath = catalogManager.storedSessionPathEntries
+ catalogManager.clearSessionPath()
+ try {
+ val entries = pathEntries(currentPath())
+ assert(entries == Seq("system.builtin", "system.builtin"),
+ s"DEFAULT_PATH duplicates should pass through to current_path();
got: $entries")
+ // Sanity: unqualified resolution still works (the second
`system.builtin` is dead).
+ checkAnswer(sql("SELECT abs(-1)"), Row(1))
+ } finally {
+ catalogManager.clearSessionPath()
+ priorSessionPath.foreach(catalogManager.setSessionPath)
+ }
+ }
+ }
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 0fbc41492e00..354473582678 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -100,6 +100,8 @@ class HiveSessionStateBuilder(
override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
ApplyCharTypePadding +:
+ ResolveSQLFunctions +:
+ ResolveDeserializer +:
singlePassCustomPostHocResolutionRules
override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit]
= {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]