This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fd836271b6f [SPARK-50403][SQL] Fix parameterized `EXECUTE IMMEDIATE`
1fd836271b6f is described below

commit 1fd836271b6f284625ba0f50551bea87071d5d65
Author: Max Gekk <[email protected]>
AuthorDate: Tue Jan 14 20:14:00 2025 +0300

    [SPARK-50403][SQL] Fix parameterized `EXECUTE IMMEDIATE`
    
    ### What changes were proposed in this pull request?
    1. Remove the assert of single parameterized query because it restricts 
parameterization of `EXECUTE IMMEDIATE`. The assert checks that only single 
node of the type `ParameterizedQuery` presents in a query, but `EXECUTE 
IMMEDIATE` adds one `ParameterizedQuery` + `sql()` adds another 
`ParameterizedQuery`. So, this case is prohibited by the assert though it is a 
valid use case from user's perspective.
    2. Modify parameters binding: stop the bind procedure when face to another 
parameterized query. For example, the sql text passed to `spark.sql()` contains 
`EXECUTE IMMEDIATE`, and `sql()` parameters don't affect on the sql query 
string in `EXECUTE IMMEDIATE`.
    3. Allow parameters in `EXECUTE IMMEDIATE` variables.
    
    ### Why are the changes needed?
    Before the changes, the following query fails with the internal error:
    ```scala
    scala> spark.sql("execute immediate 'select ?' using 1", Map("param1" -> 
"1"))
    org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase 
analysis failed with an internal error. You hit a bug in Spark or the Spark 
plugins you use. Please, report this bug to the corresponding communities or 
vendors, and provide the full stack trace. SQLSTATE: XX000
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the query above returns proper results instead of the internal error:
    ```scala
    scala> spark.sql("execute immediate 'select ?' using 1", Map("param1" -> 
"1"))
    val res2: org.apache.spark.sql.DataFrame = [1: int]
    ```
    
    ### How was this patch tested?
    By running the new test:
    ```
    $ build/sbt "sql/test:testOnly org.apache.spark.sql.ParametersSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #49442 from MaxGekk/fix-params-execute-immediate.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../sql/catalyst/analysis/executeImmediate.scala   |  9 ++++--
 .../spark/sql/catalyst/analysis/parameters.scala   | 33 ++++++++--------------
 .../org/apache/spark/sql/ParametersSuite.scala     | 32 +++++++++++++++++++++
 3 files changed, 49 insertions(+), 25 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
index c92171ec5c75..b452ca15bed5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
@@ -54,15 +54,18 @@ class SubstituteExecuteImmediate(val catalogManager: 
CatalogManager)
   def resolveVariable(e: Expression): Expression = {
 
     /**
-     * We know that the expression is either UnresolvedAttribute or Alias, as 
passed from the
-     * parser. If it is an UnresolvedAttribute, we look it up in the catalog 
and return it. If it
-     * is an Alias, we resolve the child and return an Alias with the same 
name.
+     * We know that the expression is either UnresolvedAttribute, Alias or 
Parameter, as passed from
+     * the parser. If it is an UnresolvedAttribute, we look it up in the 
catalog and return it. If
+     * it is an Alias, we resolve the child and return an Alias with the same 
name. If it is
+     * a Parameter, we leave it as is because the parameter belongs to another 
parameterized
+     * query and should be resolved later.
      */
     e match {
       case u: UnresolvedAttribute =>
         getVariableReference(u, u.nameParts)
       case a: Alias =>
         Alias(resolveVariable(a.child), a.name)()
+      case p: Parameter => p
       case other =>
         throw QueryCompilationErrors.unsupportedParameterExpression(other)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index de7374776946..2cfc2a8c90dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -104,18 +104,6 @@ case class PosParameterizedQuery(child: LogicalPlan, args: 
Seq[Expression])
     copy(child = newChild)
 }
 
-/**
- * Base class for rules that process parameterized queries.
- */
-abstract class ParameterizedQueryProcessor extends Rule[LogicalPlan] {
-  def assertUnresolvedPlanHasSingleParameterizedQuery(plan: LogicalPlan): Unit 
= {
-    if (plan.containsPattern(PARAMETERIZED_QUERY)) {
-      val parameterizedQueries = plan.collect { case p: ParameterizedQuery => 
p }
-      assert(parameterizedQueries.length == 1)
-    }
-  }
-}
-
 /**
  * Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised 
plans to be
  * resolved later by the analyzer.
@@ -127,10 +115,8 @@ abstract class ParameterizedQueryProcessor extends 
Rule[LogicalPlan] {
  * `PosParameterizedQuery(ExplainCommand(ExplainCommand(SomeQuery(...))))` =>
  * `ExplainCommand(ExplainCommand(PosParameterizedQuery(SomeQuery(...))))`
  */
-object MoveParameterizedQueriesDown extends ParameterizedQueryProcessor {
+object MoveParameterizedQueriesDown extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    assertUnresolvedPlanHasSingleParameterizedQuery(plan)
-
     plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
       case pq: ParameterizedQuery if pq.exists(isSupervisingCommand) =>
         moveParameterizedQueryIntoSupervisingCommand(pq)
@@ -161,7 +147,7 @@ object MoveParameterizedQueriesDown extends 
ParameterizedQueryProcessor {
  * by collection constructor functions such as `map()`, `array()`, `struct()`
  * from the user-specified arguments.
  */
-object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase 
{
+object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase {
   private def checkArgs(args: Iterable[(String, Expression)]): Unit = {
     def isNotAllowed(expr: Expression): Boolean = expr.exists {
       case _: Literal | _: CreateArray | _: CreateNamedStruct |
@@ -176,15 +162,18 @@ object BindParameters extends ParameterizedQueryProcessor 
with QueryErrorsBase {
     }
   }
 
-  private def bind(p: LogicalPlan)(f: PartialFunction[Expression, 
Expression]): LogicalPlan = {
-    p.resolveExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse {
-      case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
-    })
+  private def bind(p0: LogicalPlan)(f: PartialFunction[Expression, 
Expression]): LogicalPlan = {
+    var stop = false
+    p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) {
+      case p1 =>
+        stop = p1.isInstanceOf[ParameterizedQuery]
+        p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f 
orElse {
+          case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
+        })
+    }
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    assertUnresolvedPlanHasSingleParameterizedQuery(plan)
-
     plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
       // We should wait for `CTESubstitution` to resolve CTE before binding 
parameters, as CTE
       // relations are not children of `UnresolvedWith`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
index 2ac8ed26868a..bb1363f1c58c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
@@ -769,4 +769,36 @@ class ParametersSuite extends QueryTest with 
SharedSparkSession with PlanTest {
     checkAnswer(spark.sql(query(":cte"), args = Map("cte" -> "t1")), Row(1))
     checkAnswer(spark.sql(query("?"), args = Array("t1")), Row(1))
   }
+
+  test("SPARK-50403: parameterized execute immediate") {
+    checkAnswer(spark.sql("execute immediate 'select ?' using ?", Array(1)), 
Row(1))
+    checkAnswer(spark.sql("execute immediate 'select ?, ?' using ?, 2", 
Array(1)), Row(1, 2))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.sql("execute immediate 'select ?, ?' using 1", Array(2))
+      },
+      condition = "UNBOUND_SQL_PARAMETER",
+      parameters = Map("name" -> "_10"),
+      context = ExpectedContext("?", 10, 10))
+
+    checkAnswer(spark.sql("execute immediate 'select ?' using 1", Map("param1" 
-> "1")), Row(1))
+    checkAnswer(spark.sql("execute immediate 'select :param1' using :param2 as 
param1",
+      Map("param2" -> 42)), Row(42))
+    checkAnswer(spark.sql(
+      "execute immediate 'select :param1, :param2' using :param2 as param1, 43 
as param2",
+      Map("param2" -> 42)), Row(42, 43))
+    checkAnswer(spark.sql("execute immediate 'select :param' using 0 as param",
+      Map("param" -> 42)), Row(0))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.sql("execute immediate 'select :param1, :param2' using 1 as 
param1",
+          Map("param2" -> 2))
+      },
+      condition = "UNBOUND_SQL_PARAMETER",
+      parameters = Map("name" -> "param2"),
+      context = ExpectedContext(":param2", 16, 22))
+
+    checkAnswer(spark.sql("execute immediate 'select ?' using :param", 
Map("param" -> 2)), Row(2))
+    checkAnswer(spark.sql("execute immediate 'select :param' using ? as 
param", Array(3)), Row(3))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to