This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fb8c74b8d6e [SPARK-44411][SQL] Use PartitionEvaluator API in 
ArrowEvalPythonExec and  BatchEvalPythonExec
fb8c74b8d6e is described below

commit fb8c74b8d6e46df40bb2d8ec33589406f64dcb02
Author: Vinod KC <[email protected]>
AuthorDate: Wed Jul 19 11:58:22 2023 +0800

    [SPARK-44411][SQL] Use PartitionEvaluator API in ArrowEvalPythonExec and  
BatchEvalPythonExec
    
    ### What changes were proposed in this pull request?
    
    SQL operators `ArrowEvalPythonExec` & `BatchEvalPythonExec` are updated to 
use the `PartitionEvaluator` API to do execution.
    
    ### Why are the changes needed?
    
    To avoid the use of lambda during distributed execution.
    Ref: SPARK-43061 for more details.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test cases. Once all SQL operators are refactored, will enable 
`spark.sql.execution.usePartitionEvaluator` by default, so all tests cover this 
code path.
    
    Closes #41998 from vinodkc/br_SPARK-44411_EvalPython.
    
    Authored-by: Vinod KC <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/python/ArrowEvalPythonExec.scala | 43 +++++++---
 .../sql/execution/python/BatchEvalPythonExec.scala | 29 +++++--
 ...Exec.scala => EvalPythonEvaluatorFactory.scala} | 91 ++++++++--------------
 .../sql/execution/python/EvalPythonExec.scala      | 84 ++------------------
 4 files changed, 98 insertions(+), 149 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 2e25ee2ba74..0e4a420b4b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -24,6 +24,7 @@ import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.ArrowUtils
 
@@ -63,20 +64,47 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
     evalType: Int)
   extends EvalPythonExec with PythonSQLMetrics {
 
-  private val batchSize = conf.arrowMaxRecordsPerBatch
-  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
-  private val largeVarTypes = conf.arrowUseLargeVarTypes
-  private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
   private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
 
-  protected override def evaluate(
+  override protected def evaluatorFactory: EvalPythonEvaluatorFactory = {
+    new ArrowEvalPythonEvaluatorFactory(
+      child.output,
+      udfs,
+      output,
+      conf.arrowMaxRecordsPerBatch,
+      evalType,
+      conf.sessionLocalTimeZone,
+      conf.arrowUseLargeVarTypes,
+      ArrowUtils.getPythonRunnerConfMap(conf),
+      pythonMetrics,
+      jobArtifactUUID)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
+
+class ArrowEvalPythonEvaluatorFactory(
+    childOutput: Seq[Attribute],
+    udfs: Seq[PythonUDF],
+    output: Seq[Attribute],
+    batchSize: Int,
+    evalType: Int,
+    sessionLocalTimeZone: String,
+    largeVarTypes: Boolean,
+    pythonRunnerConf: Map[String, String],
+    pythonMetrics: Map[String, SQLMetric],
+    jobArtifactUUID: Option[String])
+    extends EvalPythonEvaluatorFactory(childOutput, udfs, output) {
+
+  override def evaluate(
       funcs: Seq[ChainedPythonFunctions],
       argOffsets: Array[Array[Int]],
       iter: Iterator[InternalRow],
       schema: StructType,
       context: TaskContext): Iterator[InternalRow] = {
 
-    val outputTypes = output.drop(child.output.length).map(_.dataType)
+    val outputTypes = output.drop(childOutput.length).map(_.dataType)
 
     // DO NOT use iter.grouped(). See BatchIterator.
     val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else 
Iterator(iter)
@@ -99,7 +127,4 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
       batch.rowIterator.asScala
     }
   }
-
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index 71f1610bcec..1de8f55d84b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
@@ -36,13 +37,34 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
 
   private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
 
-  protected override def evaluate(
+  override protected def evaluatorFactory: EvalPythonEvaluatorFactory = {
+    new BatchEvalPythonEvaluatorFactory(
+      child.output,
+      udfs,
+      output,
+      pythonMetrics,
+      jobArtifactUUID)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
BatchEvalPythonExec =
+    copy(child = newChild)
+}
+
+class BatchEvalPythonEvaluatorFactory(
+    childOutput: Seq[Attribute],
+    udfs: Seq[PythonUDF],
+    output: Seq[Attribute],
+    pythonMetrics: Map[String, SQLMetric],
+    jobArtifactUUID: Option[String])
+    extends EvalPythonEvaluatorFactory(childOutput, udfs, output) {
+
+  override def evaluate(
       funcs: Seq[ChainedPythonFunctions],
       argOffsets: Array[Array[Int]],
       iter: Iterator[InternalRow],
       schema: StructType,
       context: TaskContext): Iterator[InternalRow] = {
-    EvaluatePython.registerPicklers()  // register pickler for Row
+    EvaluatePython.registerPicklers() // register pickler for Row
 
     // Input iterator to Python.
     val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema)
@@ -77,9 +99,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
       }
     }
   }
-
-  override protected def withNewChildInternal(newChild: SparkPlan): 
BatchEvalPythonExec =
-    copy(child = newChild)
 }
 
 object BatchEvalPythonExec {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonEvaluatorFactory.scala
similarity index 53%
copy from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
copy to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonEvaluatorFactory.scala
index 70bcbe77fba..10bb3a45be9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonEvaluatorFactory.scala
@@ -21,61 +21,18 @@ import java.io.File
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.{ContextAwareIterator, PartitionEvaluator, 
PartitionEvaluatorFactory, SparkEnv, TaskContext}
 import org.apache.spark.api.python.ChainedPythonFunctions
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.util.Utils
 
-
-/**
- * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at 
a time.
- *
- * Python evaluation works by sending the necessary (projected) input data via 
a socket to an
- * external Python process, and combine the result from the Python process 
with the original row.
- *
- * For each row we send to Python, we also put it in a queue first. For each 
output row from Python,
- * we drain the queue to find the original input row. Note that if the Python 
process is way too
- * slow, this could lead to the queue growing unbounded and spill into disk 
when run out of memory.
- *
- * Here is a diagram to show how this works:
- *
- *            Downstream (for parent)
- *             /      \
- *            /     socket  (output of UDF)
- *           /         \
- *        RowQueue    Python
- *           \         /
- *            \     socket  (input of UDF)
- *             \     /
- *          upstream (from child)
- *
- * The rows sent to and received from Python are packed into batches (100 
rows) and serialized,
- * there should be always some rows buffered in the socket or Python process, 
so the pulling from
- * RowQueue ALWAYS happened after pushing into it.
- */
-trait EvalPythonExec extends UnaryExecNode {
-  def udfs: Seq[PythonUDF]
-  def resultAttrs: Seq[Attribute]
-
-  override def output: Seq[Attribute] = child.output ++ resultAttrs
-
-  override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
-
-  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
-    udf.children match {
-      case Seq(u: PythonUDF) =>
-        val (chained, children) = collectFunctions(u)
-        (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
-      case children =>
-        // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
-        (ChainedPythonFunctions(Seq(udf.func)), udf.children)
-    }
-  }
+abstract class EvalPythonEvaluatorFactory(
+    childOutput: Seq[Attribute],
+    udfs: Seq[PythonUDF],
+    output: Seq[Attribute])
+    extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
 
   protected def evaluate(
       funcs: Seq[ChainedPythonFunctions],
@@ -84,17 +41,35 @@ trait EvalPythonExec extends UnaryExecNode {
       schema: StructType,
       context: TaskContext): Iterator[InternalRow]
 
-  protected override def doExecute(): RDD[InternalRow] = {
-    val inputRDD = child.execute().map(_.copy())
-
-    inputRDD.mapPartitions { iter =>
+  override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] 
=
+    new EvalPythonPartitionEvaluator
+
+  private class EvalPythonPartitionEvaluator
+      extends PartitionEvaluator[InternalRow, InternalRow] {
+    private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+      udf.children match {
+        case Seq(u: PythonUDF) =>
+          val (chained, children) = collectFunctions(u)
+          (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+        case children =>
+          // There should not be any other UDFs, or the children can't be 
evaluated directly.
+          assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
+          (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+      }
+    }
+    override def eval(
+        partitionIndex: Int,
+        iters: Iterator[InternalRow]*): Iterator[InternalRow] = {
+      val iter = iters.head
       val context = TaskContext.get()
       val contextAwareIterator = new ContextAwareIterator(context, iter)
 
       // The queue used to buffer input rows so we can drain it to
       // combine input with output from Python.
-      val queue = HybridRowQueue(context.taskMemoryManager(),
-        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
+      val queue = HybridRowQueue(
+        context.taskMemoryManager(),
+        new File(Utils.getLocalDir(SparkEnv.get.conf)),
+        childOutput.length)
       context.addTaskCompletionListener[Unit] { ctx =>
         queue.close()
       }
@@ -115,7 +90,7 @@ trait EvalPythonExec extends UnaryExecNode {
           }
         }.toArray
       }.toArray
-      val projection = MutableProjection.create(allInputs.toSeq, child.output)
+      val projection = MutableProjection.create(allInputs.toSeq, childOutput)
       projection.initialize(context.partitionId())
       val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
         StructField(s"_$i", dt)
@@ -127,8 +102,8 @@ trait EvalPythonExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(
-        pyFuncs, argOffsets, projectedRowIter, schema, context)
+      val outputRowIterator =
+        evaluate(pyFuncs, argOffsets, projectedRowIter, schema, context)
 
       val joined = new JoinedRow
       val resultProj = UnsafeProjection.create(output, output)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 70bcbe77fba..1c8b0f2228f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -17,19 +17,10 @@
 
 package org.apache.spark.sql.execution.python
 
-import java.io.File
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
-import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-import org.apache.spark.util.Utils
-
 
 /**
  * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at 
a time.
@@ -61,80 +52,19 @@ trait EvalPythonExec extends UnaryExecNode {
   def udfs: Seq[PythonUDF]
   def resultAttrs: Seq[Attribute]
 
+  protected def evaluatorFactory: EvalPythonEvaluatorFactory
+
   override def output: Seq[Attribute] = child.output ++ resultAttrs
 
   override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
 
-  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
-    udf.children match {
-      case Seq(u: PythonUDF) =>
-        val (chained, children) = collectFunctions(u)
-        (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
-      case children =>
-        // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
-        (ChainedPythonFunctions(Seq(udf.func)), udf.children)
-    }
-  }
-
-  protected def evaluate(
-      funcs: Seq[ChainedPythonFunctions],
-      argOffsets: Array[Array[Int]],
-      iter: Iterator[InternalRow],
-      schema: StructType,
-      context: TaskContext): Iterator[InternalRow]
-
   protected override def doExecute(): RDD[InternalRow] = {
     val inputRDD = child.execute().map(_.copy())
-
-    inputRDD.mapPartitions { iter =>
-      val context = TaskContext.get()
-      val contextAwareIterator = new ContextAwareIterator(context, iter)
-
-      // The queue used to buffer input rows so we can drain it to
-      // combine input with output from Python.
-      val queue = HybridRowQueue(context.taskMemoryManager(),
-        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
-      context.addTaskCompletionListener[Unit] { ctx =>
-        queue.close()
-      }
-
-      val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
-
-      // flatten all the arguments
-      val allInputs = new ArrayBuffer[Expression]
-      val dataTypes = new ArrayBuffer[DataType]
-      val argOffsets = inputs.map { input =>
-        input.map { e =>
-          if (allInputs.exists(_.semanticEquals(e))) {
-            allInputs.indexWhere(_.semanticEquals(e))
-          } else {
-            allInputs += e
-            dataTypes += e.dataType
-            allInputs.length - 1
-          }
-        }.toArray
-      }.toArray
-      val projection = MutableProjection.create(allInputs.toSeq, child.output)
-      projection.initialize(context.partitionId())
-      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
-        StructField(s"_$i", dt)
-      }.toArray)
-
-      // Add rows to queue to join later with the result.
-      val projectedRowIter = contextAwareIterator.map { inputRow =>
-        queue.add(inputRow.asInstanceOf[UnsafeRow])
-        projection(inputRow)
-      }
-
-      val outputRowIterator = evaluate(
-        pyFuncs, argOffsets, projectedRowIter, schema, context)
-
-      val joined = new JoinedRow
-      val resultProj = UnsafeProjection.create(output, output)
-
-      outputRowIterator.map { outputRow =>
-        resultProj(joined(queue.remove(), outputRow))
+    if (conf.usePartitionEvaluator) {
+      inputRDD.mapPartitionsWithEvaluator(evaluatorFactory)
+    } else {
+      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+        evaluatorFactory.createEvaluator().eval(index, iter)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to