This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 66162360e773 [SPARK-48344][SQL] Enhance SQL Script Execution: Replace 
NOOP with COLLECT for Result DataFrames
66162360e773 is described below

commit 66162360e773f0c37afe1200b4539caff6423143
Author: Milan Dankovic <[email protected]>
AuthorDate: Fri Jan 10 09:05:19 2025 +0800

    [SPARK-48344][SQL] Enhance SQL Script Execution: Replace NOOP with COLLECT 
for Result DataFrames
    
    ### What changes were proposed in this pull request?
    This pull request proposes replacing the **noop** operation with 
**collect** for all _result_ DataFrames on the caller side of the SQL Script 
execution process.
    
    This is the 4th PR in the series of introducing SQL Scripting Execution 
into Spark.
    
    ### Why are the changes needed?
    The proposed change is necessary to maintain a **critical invariant during 
SQL Script execution**: when `SqlScriptingExecution` returns the next available 
result statement, it must be executed before proceeding with iteration.
    
    ### Implementation details
    SQL Script execution is based on iterating over interpreted statements and 
executing them as they are encountered. For certain statement types (_result_ 
statements), execution is delegated to the caller side (`SparkSession`). To 
achieve this, the iteration process is divided into two stages:
    
    - All Compound Execution Nodes (Begin-End block, control flow structures, 
loops) implement iterator accessible via the `getTreeIterator` method.
    - `SqlScriptingExecution` serves as a second-level iterator, iterating over 
all statements and executing those that **are not** _result_ statements. 
_Result_ statements are returned to the caller for execution on the caller 
side. The caller must adhere to the contract of executing the returned 
statement before continuing iteration.
    
    Due to the nature of this contract between the caller and the 
`SqlScriptingExecution` API, the implementation of the Java Iterator Interface 
is not feasible. It is expected from caller to call `getNextResult` until it 
returns `None` We will enforce correct usage of `SqlScriptingExecution` API 
through the future PR review process.
    
    In this approach we collect every DataFrame to eliminate concerns about 
which one needs to be returned last. This strategy will also be utilized when 
we introduce multiple-results API functionality.
    
    ### Impact and Considerations
    This change enhances the robustness of our SQL Script execution process and 
lays the groundwork for future improvements, including the implementation of a 
multiple-results API. Reviewers should pay particular attention to the handling 
of DataFrame collection and the maintenance of execution order integrity.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49372 from 
miland-db/milan-dankovic_data/refactor-execution-3-followup.
    
    Authored-by: Milan Dankovic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../plans/logical/SqlScriptingLogicalPlans.scala   |  2 ++
 .../scala/org/apache/spark/sql/SparkSession.scala  | 17 +++++-----
 .../sql/scripting/SqlScriptingExecution.scala      | 36 ++++++++++++----------
 .../scripting/SqlScriptingExecutionContext.scala   |  3 +-
 .../sql/scripting/SqlScriptingExecutionNode.scala  |  3 +-
 .../sql/scripting/SqlScriptingExecutionSuite.scala | 12 +++++++-
 .../scripting/SqlScriptingInterpreterSuite.scala   |  2 +-
 7 files changed, 46 insertions(+), 29 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
index 207c586996fd..ad00a5216b4c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
@@ -62,6 +62,8 @@ case class SingleStatement(parsedPlan: LogicalPlan)
  * @param label Label set to CompoundBody by user or UUID otherwise.
  *              It can be None in case when CompoundBody is not part of 
BeginEndCompoundBlock
  *              for example when CompoundBody is inside loop or conditional 
block.
+ * @param isScope Flag indicating if the CompoundBody is a labeled scope.
+ *                Scopes are used for grouping local variables and exception 
handlers.
  */
 case class CompoundBody(
     collection: Seq[CompoundPlanStatement],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 878fdc8e267a..3b36f6b59cb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -448,16 +448,17 @@ class SparkSession private(
     val sse = new SqlScriptingExecution(script, this, args)
     var result: Option[Seq[Row]] = None
 
-    while (sse.hasNext) {
+    // We must execute returned df before calling sse.getNextResult again 
because sse.hasNext
+    // advances the script execution and executes all statements until the 
next result. We must
+    // collect results immediately to maintain execution order.
+    // This ensures we respect the contract of SqlScriptingExecution API.
+    var df: Option[DataFrame] = sse.getNextResult
+    while (df.isDefined) {
       sse.withErrorHandling {
-        val df = sse.next()
-        if (sse.hasNext) {
-          df.write.format("noop").mode("overwrite").save()
-        } else {
-          // Collect results from the last DataFrame.
-          result = Some(df.collect().toSeq)
-        }
+        // Collect results from the current DataFrame.
+        result = Some(df.get.collect().toSeq)
       }
+      df = sse.getNextResult
     }
 
     if (result.isEmpty) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
index 71b44cbbd070..2b15a6c55fa9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.scripting
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, 
CompoundBody}
@@ -25,6 +24,9 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CompoundBody}
 /**
  * SQL scripting executor - executes script and returns result statements.
  * This supports returning multiple result statements from a single script.
+ * The caller of the SqlScriptingExecution API must adhere to the contract of 
executing
+ * the returned statement before continuing iteration. Executing the statement 
needs to be done
+ * inside withErrorHandling block.
  *
  * @param sqlScript CompoundBody which need to be executed.
  * @param session Spark session that SQL script is executed within.
@@ -33,7 +35,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CompoundBody}
 class SqlScriptingExecution(
     sqlScript: CompoundBody,
     session: SparkSession,
-    args: Map[String, Expression]) extends Iterator[DataFrame] {
+    args: Map[String, Expression]) {
 
   private val interpreter = SqlScriptingInterpreter(session)
 
@@ -42,7 +44,7 @@ class SqlScriptingExecution(
     val ctx = new SqlScriptingExecutionContext()
     val executionPlan = interpreter.buildExecutionPlan(sqlScript, args, ctx)
     // Add frame which represents SQL Script to the context.
-    ctx.frames.addOne(new 
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
+    ctx.frames.append(new 
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
     // Enter the scope of the top level compound.
     // We don't need to exit this scope explicitly as it will be done 
automatically
     // when the frame is removed during iteration.
@@ -50,32 +52,32 @@ class SqlScriptingExecution(
     ctx
   }
 
-  private var current: Option[DataFrame] = getNextResult
-
-  override def hasNext: Boolean = current.isDefined
-
-  override def next(): DataFrame = {
-    current match {
-      case None => throw SparkException.internalError("No more elements to 
iterate through.")
-      case Some(result) =>
-        current = getNextResult
-        result
-    }
-  }
 
   /** Helper method to iterate get next statements from the first available 
frame. */
   private def getNextStatement: Option[CompoundStatementExec] = {
+    // Remove frames that are already executed.
     while (context.frames.nonEmpty && !context.frames.last.hasNext) {
       context.frames.remove(context.frames.size - 1)
     }
+    // If there are still frames available, get the next statement.
     if (context.frames.nonEmpty) {
       return Some(context.frames.last.next())
     }
     None
   }
 
-  /** Helper method to iterate through statements until next result statement 
is encountered. */
-  private def getNextResult: Option[DataFrame] = {
+  /**
+   * Advances through the script and executes statements until a result 
statement or
+   * end of script is encountered.
+   *
+   * To know if there is result statement available, the method has to advance 
through script and
+   * execute statements until the result statement or end of script is 
encountered. For that reason
+   * the returned result must be executed before subsequent calls. Multiple 
calls without executing
+   * the intermediate results will lead to incorrect behavior.
+   *
+   * @return Result DataFrame if it is available, otherwise None.
+   */
+  def getNextResult: Option[DataFrame] = {
     var currentStatement = getNextStatement
     // While we don't have a result statement, execute the statements.
     while (currentStatement.isDefined) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
index 5a2ef62e3bb7..94462ab828f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
@@ -63,7 +63,7 @@ class SqlScriptingExecutionFrame(
   }
 
   def enterScope(label: String): Unit = {
-    scopes.addOne(new SqlScriptingExecutionScope(label))
+    scopes.append(new SqlScriptingExecutionScope(label))
   }
 
   def exitScope(label: String): Unit = {
@@ -76,6 +76,7 @@ class SqlScriptingExecutionFrame(
       scopes.remove(scopes.length - 1)
     }
 
+    // Remove the scope with the given label.
     if (scopes.nonEmpty) {
       scopes.remove(scopes.length - 1)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
index ee47491b803b..58cbfb0feb01 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
@@ -181,7 +181,8 @@ class NoOpStatementExec extends LeafStatementExec {
  * @param label
  *   Label set by user to CompoundBody or None otherwise.
  * @param isScope
- *   Flag that indicates whether Compound Body is scope or not.
+ *   Flag indicating if the CompoundBody is a labeled scope.
+ *   Scopes are used for grouping local variables and exception handlers.
  * @param context
  *   SqlScriptingExecutionContext keeps the execution state of current script.
  */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
index bbeae942f9fe..5b5285ea1327 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.scripting
 
+import scala.collection.mutable.ListBuffer
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -43,7 +45,15 @@ class SqlScriptingExecutionSuite extends QueryTest with 
SharedSparkSession {
       args: Map[String, Expression] = Map.empty): Seq[Array[Row]] = {
     val compoundBody = 
spark.sessionState.sqlParser.parsePlan(sqlText).asInstanceOf[CompoundBody]
     val sse = new SqlScriptingExecution(compoundBody, spark, args)
-    sse.map { df => df.collect() }.toList
+    val result: ListBuffer[Array[Row]] = ListBuffer.empty
+
+    var df = sse.getNextResult
+    while (df.isDefined) {
+      // Collect results from the current DataFrame.
+      result.append(df.get.collect())
+      df = sse.getNextResult
+    }
+    result.toSeq
   }
 
   private def verifySqlScriptResult(sqlText: String, expected: Seq[Seq[Row]]): 
Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
index 20997504b15e..c7439a8934d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
@@ -49,7 +49,7 @@ class SqlScriptingInterpreterSuite extends QueryTest with 
SharedSparkSession {
     // Initialize context so scopes can be entered correctly.
     val context = new SqlScriptingExecutionContext()
     val executionPlan = interpreter.buildExecutionPlan(compoundBody, args, 
context)
-    context.frames.addOne(new 
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
+    context.frames.append(new 
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
     executionPlan.enterScope()
 
     executionPlan.getTreeIterator.flatMap {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to