This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1fd836271b6f [SPARK-50403][SQL] Fix parameterized `EXECUTE IMMEDIATE`
1fd836271b6f is described below
commit 1fd836271b6f284625ba0f50551bea87071d5d65
Author: Max Gekk <[email protected]>
AuthorDate: Tue Jan 14 20:14:00 2025 +0300
[SPARK-50403][SQL] Fix parameterized `EXECUTE IMMEDIATE`
### What changes were proposed in this pull request?
1. Remove the assert of single parameterized query because it restricts
parameterization of `EXECUTE IMMEDIATE`. The assert checks that only single
node of the type `ParameterizedQuery` presents in a query, but `EXECUTE
IMMEDIATE` adds one `ParameterizedQuery` + `sql()` adds another
`ParameterizedQuery`. So, this case is prohibited by the assert though it is a
valid use case from user's perspective.
2. Modify parameters binding: stop the bind procedure when face to another
parameterized query. For example, the sql text passed to `spark.sql()` contains
`EXECUTE IMMEDIATE`, and `sql()` parameters don't affect on the sql query
string in `EXECUTE IMMEDIATE`.
3. Allow parameters in `EXECUTE IMMEDIATE` variables.
### Why are the changes needed?
Before the changes, the following query fails with the internal error:
```scala
scala> spark.sql("execute immediate 'select ?' using 1", Map("param1" ->
"1"))
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase
analysis failed with an internal error. You hit a bug in Spark or the Spark
plugins you use. Please, report this bug to the corresponding communities or
vendors, and provide the full stack trace. SQLSTATE: XX000
```
### Does this PR introduce _any_ user-facing change?
Yes, the query above returns proper results instead of the internal error:
```scala
scala> spark.sql("execute immediate 'select ?' using 1", Map("param1" ->
"1"))
val res2: org.apache.spark.sql.DataFrame = [1: int]
```
### How was this patch tested?
By running the new test:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.ParametersSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49442 from MaxGekk/fix-params-execute-immediate.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../sql/catalyst/analysis/executeImmediate.scala | 9 ++++--
.../spark/sql/catalyst/analysis/parameters.scala | 33 ++++++++--------------
.../org/apache/spark/sql/ParametersSuite.scala | 32 +++++++++++++++++++++
3 files changed, 49 insertions(+), 25 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
index c92171ec5c75..b452ca15bed5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
@@ -54,15 +54,18 @@ class SubstituteExecuteImmediate(val catalogManager:
CatalogManager)
def resolveVariable(e: Expression): Expression = {
/**
- * We know that the expression is either UnresolvedAttribute or Alias, as
passed from the
- * parser. If it is an UnresolvedAttribute, we look it up in the catalog
and return it. If it
- * is an Alias, we resolve the child and return an Alias with the same
name.
+ * We know that the expression is either UnresolvedAttribute, Alias or
Parameter, as passed from
+ * the parser. If it is an UnresolvedAttribute, we look it up in the
catalog and return it. If
+ * it is an Alias, we resolve the child and return an Alias with the same
name. If it is
+ * a Parameter, we leave it as is because the parameter belongs to another
parameterized
+ * query and should be resolved later.
*/
e match {
case u: UnresolvedAttribute =>
getVariableReference(u, u.nameParts)
case a: Alias =>
Alias(resolveVariable(a.child), a.name)()
+ case p: Parameter => p
case other =>
throw QueryCompilationErrors.unsupportedParameterExpression(other)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index de7374776946..2cfc2a8c90dc 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -104,18 +104,6 @@ case class PosParameterizedQuery(child: LogicalPlan, args:
Seq[Expression])
copy(child = newChild)
}
-/**
- * Base class for rules that process parameterized queries.
- */
-abstract class ParameterizedQueryProcessor extends Rule[LogicalPlan] {
- def assertUnresolvedPlanHasSingleParameterizedQuery(plan: LogicalPlan): Unit
= {
- if (plan.containsPattern(PARAMETERIZED_QUERY)) {
- val parameterizedQueries = plan.collect { case p: ParameterizedQuery =>
p }
- assert(parameterizedQueries.length == 1)
- }
- }
-}
-
/**
* Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised
plans to be
* resolved later by the analyzer.
@@ -127,10 +115,8 @@ abstract class ParameterizedQueryProcessor extends
Rule[LogicalPlan] {
* `PosParameterizedQuery(ExplainCommand(ExplainCommand(SomeQuery(...))))` =>
* `ExplainCommand(ExplainCommand(PosParameterizedQuery(SomeQuery(...))))`
*/
-object MoveParameterizedQueriesDown extends ParameterizedQueryProcessor {
+object MoveParameterizedQueriesDown extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- assertUnresolvedPlanHasSingleParameterizedQuery(plan)
-
plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
case pq: ParameterizedQuery if pq.exists(isSupervisingCommand) =>
moveParameterizedQueryIntoSupervisingCommand(pq)
@@ -161,7 +147,7 @@ object MoveParameterizedQueriesDown extends
ParameterizedQueryProcessor {
* by collection constructor functions such as `map()`, `array()`, `struct()`
* from the user-specified arguments.
*/
-object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase
{
+object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase {
private def checkArgs(args: Iterable[(String, Expression)]): Unit = {
def isNotAllowed(expr: Expression): Boolean = expr.exists {
case _: Literal | _: CreateArray | _: CreateNamedStruct |
@@ -176,15 +162,18 @@ object BindParameters extends ParameterizedQueryProcessor
with QueryErrorsBase {
}
}
- private def bind(p: LogicalPlan)(f: PartialFunction[Expression,
Expression]): LogicalPlan = {
- p.resolveExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse {
- case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
- })
+ private def bind(p0: LogicalPlan)(f: PartialFunction[Expression,
Expression]): LogicalPlan = {
+ var stop = false
+ p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) {
+ case p1 =>
+ stop = p1.isInstanceOf[ParameterizedQuery]
+ p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f
orElse {
+ case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
+ })
+ }
}
override def apply(plan: LogicalPlan): LogicalPlan = {
- assertUnresolvedPlanHasSingleParameterizedQuery(plan)
-
plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
// We should wait for `CTESubstitution` to resolve CTE before binding
parameters, as CTE
// relations are not children of `UnresolvedWith`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
index 2ac8ed26868a..bb1363f1c58c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
@@ -769,4 +769,36 @@ class ParametersSuite extends QueryTest with
SharedSparkSession with PlanTest {
checkAnswer(spark.sql(query(":cte"), args = Map("cte" -> "t1")), Row(1))
checkAnswer(spark.sql(query("?"), args = Array("t1")), Row(1))
}
+
+ test("SPARK-50403: parameterized execute immediate") {
+ checkAnswer(spark.sql("execute immediate 'select ?' using ?", Array(1)),
Row(1))
+ checkAnswer(spark.sql("execute immediate 'select ?, ?' using ?, 2",
Array(1)), Row(1, 2))
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.sql("execute immediate 'select ?, ?' using 1", Array(2))
+ },
+ condition = "UNBOUND_SQL_PARAMETER",
+ parameters = Map("name" -> "_10"),
+ context = ExpectedContext("?", 10, 10))
+
+ checkAnswer(spark.sql("execute immediate 'select ?' using 1", Map("param1"
-> "1")), Row(1))
+ checkAnswer(spark.sql("execute immediate 'select :param1' using :param2 as
param1",
+ Map("param2" -> 42)), Row(42))
+ checkAnswer(spark.sql(
+ "execute immediate 'select :param1, :param2' using :param2 as param1, 43
as param2",
+ Map("param2" -> 42)), Row(42, 43))
+ checkAnswer(spark.sql("execute immediate 'select :param' using 0 as param",
+ Map("param" -> 42)), Row(0))
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.sql("execute immediate 'select :param1, :param2' using 1 as
param1",
+ Map("param2" -> 2))
+ },
+ condition = "UNBOUND_SQL_PARAMETER",
+ parameters = Map("name" -> "param2"),
+ context = ExpectedContext(":param2", 16, 22))
+
+ checkAnswer(spark.sql("execute immediate 'select ?' using :param",
Map("param" -> 2)), Row(2))
+ checkAnswer(spark.sql("execute immediate 'select :param' using ? as
param", Array(3)), Row(3))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]