This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 66162360e773 [SPARK-48344][SQL] Enhance SQL Script Execution: Replace
NOOP with COLLECT for Result DataFrames
66162360e773 is described below
commit 66162360e773f0c37afe1200b4539caff6423143
Author: Milan Dankovic <[email protected]>
AuthorDate: Fri Jan 10 09:05:19 2025 +0800
[SPARK-48344][SQL] Enhance SQL Script Execution: Replace NOOP with COLLECT
for Result DataFrames
### What changes were proposed in this pull request?
This pull request proposes replacing the **noop** operation with
**collect** for all _result_ DataFrames on the caller side of the SQL Script
execution process.
This is the 4th PR in the series of introducing SQL Scripting Execution
into Spark.
### Why are the changes needed?
The proposed change is necessary to maintain a **critical invariant during
SQL Script execution**: when `SqlScriptingExecution` returns the next available
result statement, it must be executed before proceeding with iteration.
### Implementation details
SQL Script execution is based on iterating over interpreted statements and
executing them as they are encountered. For certain statement types (_result_
statements), execution is delegated to the caller side (`SparkSession`). To
achieve this, the iteration process is divided into two stages:
- All Compound Execution Nodes (Begin-End block, control flow structures,
loops) implement iterator accessible via the `getTreeIterator` method.
- `SqlScriptingExecution` serves as a second-level iterator, iterating over
all statements and executing those that **are not** _result_ statements.
_Result_ statements are returned to the caller for execution on the caller
side. The caller must adhere to the contract of executing the returned
statement before continuing iteration.
Due to the nature of this contract between the caller and the
`SqlScriptingExecution` API, the implementation of the Java Iterator Interface
is not feasible. It is expected from caller to call `getNextResult` until it
returns `None` We will enforce correct usage of `SqlScriptingExecution` API
through the future PR review process.
In this approach we collect every DataFrame to eliminate concerns about
which one needs to be returned last. This strategy will also be utilized when
we introduce multiple-results API functionality.
### Impact and Considerations
This change enhances the robustness of our SQL Script execution process and
lays the groundwork for future improvements, including the implementation of a
multiple-results API. Reviewers should pay particular attention to the handling
of DataFrame collection and the maintenance of execution order integrity.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49372 from
miland-db/milan-dankovic_data/refactor-execution-3-followup.
Authored-by: Milan Dankovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../plans/logical/SqlScriptingLogicalPlans.scala | 2 ++
.../scala/org/apache/spark/sql/SparkSession.scala | 17 +++++-----
.../sql/scripting/SqlScriptingExecution.scala | 36 ++++++++++++----------
.../scripting/SqlScriptingExecutionContext.scala | 3 +-
.../sql/scripting/SqlScriptingExecutionNode.scala | 3 +-
.../sql/scripting/SqlScriptingExecutionSuite.scala | 12 +++++++-
.../scripting/SqlScriptingInterpreterSuite.scala | 2 +-
7 files changed, 46 insertions(+), 29 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
index 207c586996fd..ad00a5216b4c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
@@ -62,6 +62,8 @@ case class SingleStatement(parsedPlan: LogicalPlan)
* @param label Label set to CompoundBody by user or UUID otherwise.
* It can be None in case when CompoundBody is not part of
BeginEndCompoundBlock
* for example when CompoundBody is inside loop or conditional
block.
+ * @param isScope Flag indicating if the CompoundBody is a labeled scope.
+ * Scopes are used for grouping local variables and exception
handlers.
*/
case class CompoundBody(
collection: Seq[CompoundPlanStatement],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 878fdc8e267a..3b36f6b59cb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -448,16 +448,17 @@ class SparkSession private(
val sse = new SqlScriptingExecution(script, this, args)
var result: Option[Seq[Row]] = None
- while (sse.hasNext) {
+ // We must execute returned df before calling sse.getNextResult again
because sse.hasNext
+ // advances the script execution and executes all statements until the
next result. We must
+ // collect results immediately to maintain execution order.
+ // This ensures we respect the contract of SqlScriptingExecution API.
+ var df: Option[DataFrame] = sse.getNextResult
+ while (df.isDefined) {
sse.withErrorHandling {
- val df = sse.next()
- if (sse.hasNext) {
- df.write.format("noop").mode("overwrite").save()
- } else {
- // Collect results from the last DataFrame.
- result = Some(df.collect().toSeq)
- }
+ // Collect results from the current DataFrame.
+ result = Some(df.get.collect().toSeq)
}
+ df = sse.getNextResult
}
if (result.isEmpty) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
index 71b44cbbd070..2b15a6c55fa9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.scripting
-import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult,
CompoundBody}
@@ -25,6 +24,9 @@ import
org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CompoundBody}
/**
* SQL scripting executor - executes script and returns result statements.
* This supports returning multiple result statements from a single script.
+ * The caller of the SqlScriptingExecution API must adhere to the contract of
executing
+ * the returned statement before continuing iteration. Executing the statement
needs to be done
+ * inside withErrorHandling block.
*
* @param sqlScript CompoundBody which need to be executed.
* @param session Spark session that SQL script is executed within.
@@ -33,7 +35,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CompoundBody}
class SqlScriptingExecution(
sqlScript: CompoundBody,
session: SparkSession,
- args: Map[String, Expression]) extends Iterator[DataFrame] {
+ args: Map[String, Expression]) {
private val interpreter = SqlScriptingInterpreter(session)
@@ -42,7 +44,7 @@ class SqlScriptingExecution(
val ctx = new SqlScriptingExecutionContext()
val executionPlan = interpreter.buildExecutionPlan(sqlScript, args, ctx)
// Add frame which represents SQL Script to the context.
- ctx.frames.addOne(new
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
+ ctx.frames.append(new
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
// Enter the scope of the top level compound.
// We don't need to exit this scope explicitly as it will be done
automatically
// when the frame is removed during iteration.
@@ -50,32 +52,32 @@ class SqlScriptingExecution(
ctx
}
- private var current: Option[DataFrame] = getNextResult
-
- override def hasNext: Boolean = current.isDefined
-
- override def next(): DataFrame = {
- current match {
- case None => throw SparkException.internalError("No more elements to
iterate through.")
- case Some(result) =>
- current = getNextResult
- result
- }
- }
/** Helper method to iterate get next statements from the first available
frame. */
private def getNextStatement: Option[CompoundStatementExec] = {
+ // Remove frames that are already executed.
while (context.frames.nonEmpty && !context.frames.last.hasNext) {
context.frames.remove(context.frames.size - 1)
}
+ // If there are still frames available, get the next statement.
if (context.frames.nonEmpty) {
return Some(context.frames.last.next())
}
None
}
- /** Helper method to iterate through statements until next result statement
is encountered. */
- private def getNextResult: Option[DataFrame] = {
+ /**
+ * Advances through the script and executes statements until a result
statement or
+ * end of script is encountered.
+ *
+ * To know if there is result statement available, the method has to advance
through script and
+ * execute statements until the result statement or end of script is
encountered. For that reason
+ * the returned result must be executed before subsequent calls. Multiple
calls without executing
+ * the intermediate results will lead to incorrect behavior.
+ *
+ * @return Result DataFrame if it is available, otherwise None.
+ */
+ def getNextResult: Option[DataFrame] = {
var currentStatement = getNextStatement
// While we don't have a result statement, execute the statements.
while (currentStatement.isDefined) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
index 5a2ef62e3bb7..94462ab828f7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
@@ -63,7 +63,7 @@ class SqlScriptingExecutionFrame(
}
def enterScope(label: String): Unit = {
- scopes.addOne(new SqlScriptingExecutionScope(label))
+ scopes.append(new SqlScriptingExecutionScope(label))
}
def exitScope(label: String): Unit = {
@@ -76,6 +76,7 @@ class SqlScriptingExecutionFrame(
scopes.remove(scopes.length - 1)
}
+ // Remove the scope with the given label.
if (scopes.nonEmpty) {
scopes.remove(scopes.length - 1)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
index ee47491b803b..58cbfb0feb01 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
@@ -181,7 +181,8 @@ class NoOpStatementExec extends LeafStatementExec {
* @param label
* Label set by user to CompoundBody or None otherwise.
* @param isScope
- * Flag that indicates whether Compound Body is scope or not.
+ * Flag indicating if the CompoundBody is a labeled scope.
+ * Scopes are used for grouping local variables and exception handlers.
* @param context
* SqlScriptingExecutionContext keeps the execution state of current script.
*/
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
index bbeae942f9fe..5b5285ea1327 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.scripting
+import scala.collection.mutable.ListBuffer
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -43,7 +45,15 @@ class SqlScriptingExecutionSuite extends QueryTest with
SharedSparkSession {
args: Map[String, Expression] = Map.empty): Seq[Array[Row]] = {
val compoundBody =
spark.sessionState.sqlParser.parsePlan(sqlText).asInstanceOf[CompoundBody]
val sse = new SqlScriptingExecution(compoundBody, spark, args)
- sse.map { df => df.collect() }.toList
+ val result: ListBuffer[Array[Row]] = ListBuffer.empty
+
+ var df = sse.getNextResult
+ while (df.isDefined) {
+ // Collect results from the current DataFrame.
+ result.append(df.get.collect())
+ df = sse.getNextResult
+ }
+ result.toSeq
}
private def verifySqlScriptResult(sqlText: String, expected: Seq[Seq[Row]]):
Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
index 20997504b15e..c7439a8934d7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
@@ -49,7 +49,7 @@ class SqlScriptingInterpreterSuite extends QueryTest with
SharedSparkSession {
// Initialize context so scopes can be entered correctly.
val context = new SqlScriptingExecutionContext()
val executionPlan = interpreter.buildExecutionPlan(compoundBody, args,
context)
- context.frames.addOne(new
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
+ context.frames.append(new
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
executionPlan.enterScope()
executionPlan.getTreeIterator.flatMap {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]