This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9b43a9f3ea5 [SPARK-44361][SQL] Use PartitionEvaluator API in
MapInBatchExec
9b43a9f3ea5 is described below
commit 9b43a9f3ea551a594835a4742f7b2d1fdb1cf518
Author: Vinod KC <[email protected]>
AuthorDate: Wed Jul 19 12:02:52 2023 +0800
[SPARK-44361][SQL] Use PartitionEvaluator API in MapInBatchExec
### What changes were proposed in this pull request?
SQL operator `MapInBatchExec` is updated to use the `PartitionEvaluator`
API to do execution.
Added a new method `mapPartitionsWithEvaluator` in `RDDBarrier`.
### Why are the changes needed?
To avoid the use of lambda during distributed execution.
Ref: SPARK-43061 for more details.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test cases. Once all SQL operators are refactored, will enable
`spark.sql.execution.usePartitionEvaluator` by default, so all tests cover this
code path.
Closes #42024 from vinodkc/br_SPARK-44361.
Authored-by: Vinod KC <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/rdd/RDDBarrier.scala | 16 +++-
.../python/MapInBatchEvaluatorFactory.scala | 92 ++++++++++++++++++++++
.../sql/execution/python/MapInBatchExec.scala | 80 ++++++++-----------
3 files changed, 137 insertions(+), 51 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
index b70ea0073c9..13ce8f1e1b5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
@@ -19,8 +19,8 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.TaskContext
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.{PartitionEvaluatorFactory, TaskContext}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
/**
* :: Experimental ::
@@ -76,5 +76,17 @@ class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
)
}
+ /**
+ * Return a new RDD by applying an evaluator to each partition of the
wrapped RDD. The given
+ * evaluator factory will be serialized and sent to executors, and each task
will create an
+ * evaluator with the factory, and use the evaluator to transform the data
of the input
+ * partition.
+ */
+ @DeveloperApi
+ @Since("3.5.0")
+ def mapPartitionsWithEvaluator[U: ClassTag](
+ evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] =
rdd.withScope {
+ new MapPartitionsWithEvaluatorRDD(rdd, evaluatorFactory)
+ }
// TODO: [SPARK-25247] add extra conf to RDDBarrier, e.g., timeout.
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
new file mode 100644
index 00000000000..efb063476a4
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{ContextAwareIterator, TaskContext}
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
+import org.apache.spark.api.python.{ChainedPythonFunctions}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+class MapInBatchEvaluatorFactory(
+ output: Seq[Attribute],
+ chainedFunc: Seq[ChainedPythonFunctions],
+ outputTypes: StructType,
+ batchSize: Int,
+ pythonEvalType: Int,
+ sessionLocalTimeZone: String,
+ largeVarTypes: Boolean,
+ pythonRunnerConf: Map[String, String],
+ pythonMetrics: Map[String, SQLMetric],
+ jobArtifactUUID: Option[String])
+ extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
+
+ override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow]
=
+ new MapInBatchEvaluator
+
+ private class MapInBatchEvaluator extends PartitionEvaluator[InternalRow,
InternalRow] {
+ override def eval(
+ partitionIndex: Int,
+ inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
+ assert(inputs.length == 1)
+ val inputIter = inputs.head
+ // Single function with one struct.
+ val argOffsets = Array(Array(0))
+ val context = TaskContext.get()
+ val contextAwareIterator = new ContextAwareIterator(context, inputIter)
+
+ // Here we wrap it via another row so that Python sides understand it
+ // as a DataFrame.
+ val wrappedIter = contextAwareIterator.map(InternalRow(_))
+
+ // DO NOT use iter.grouped(). See BatchIterator.
+ val batchIter =
+ if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else
Iterator(wrappedIter)
+
+ val columnarBatchIter = new ArrowPythonRunner(
+ chainedFunc,
+ pythonEvalType,
+ argOffsets,
+ StructType(Array(StructField("struct", outputTypes))),
+ sessionLocalTimeZone,
+ largeVarTypes,
+ pythonRunnerConf,
+ pythonMetrics,
+ jobArtifactUUID).compute(batchIter, context.partitionId(), context)
+
+ val unsafeProj = UnsafeProjection.create(output, output)
+
+ columnarBatchIter
+ .flatMap { batch =>
+ // Scalar Iterator UDF returns a StructType column in ColumnarBatch,
select
+ // the children here
+ val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+ val outputVectors = output.indices.map(structVector.getChild)
+ val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
+ flattenedBatch.setNumRows(batch.numRows())
+ flattenedBatch.rowIterator.asScala
+ }
+ .map(unsafeProj)
+ }
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
index b4af3db3c83..0703f57c33d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
@@ -17,18 +17,14 @@
package org.apache.spark.sql.execution.python
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{ContextAwareIterator, JobArtifactSet, TaskContext}
+import org.apache.spark.JobArtifactSet
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
-import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
/**
* A relation produced by applying a function that takes an iterator of batches
@@ -56,53 +52,39 @@ trait MapInBatchExec extends UnaryExecNode with
PythonSQLMetrics {
override def outputPartitioning: Partitioning = child.outputPartitioning
override protected def doExecute(): RDD[InternalRow] = {
- def mapper(inputIter: Iterator[InternalRow]): Iterator[InternalRow] = {
- // Single function with one struct.
- val argOffsets = Array(Array(0))
- val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction)))
- val sessionLocalTimeZone = conf.sessionLocalTimeZone
- val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
- val outputTypes = child.schema
-
- val context = TaskContext.get()
- val contextAwareIterator = new ContextAwareIterator(context, inputIter)
-
- // Here we wrap it via another row so that Python sides understand it
- // as a DataFrame.
- val wrappedIter = contextAwareIterator.map(InternalRow(_))
-
- // DO NOT use iter.grouped(). See BatchIterator.
- val batchIter =
- if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else
Iterator(wrappedIter)
-
- val columnarBatchIter = new ArrowPythonRunner(
- chainedFunc,
- pythonEvalType,
- argOffsets,
- StructType(Array(StructField("struct", outputTypes))),
- sessionLocalTimeZone,
- largeVarTypes,
- pythonRunnerConf,
- pythonMetrics,
- jobArtifactUUID).compute(batchIter, context.partitionId(), context)
-
- val unsafeProj = UnsafeProjection.create(output, output)
-
- columnarBatchIter.flatMap { batch =>
- // Scalar Iterator UDF returns a StructType column in ColumnarBatch,
select
- // the children here
- val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
- val outputVectors = output.indices.map(structVector.getChild)
- val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
- flattenedBatch.setNumRows(batch.numRows())
- flattenedBatch.rowIterator.asScala
- }.map(unsafeProj)
- }
+ val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
+ val pythonFunction = func.asInstanceOf[PythonUDF].func
+ val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction)))
+ val evaluatorFactory = new MapInBatchEvaluatorFactory(
+ output,
+ chainedFunc,
+ child.schema,
+ conf.arrowMaxRecordsPerBatch,
+ pythonEvalType,
+ conf.sessionLocalTimeZone,
+ conf.arrowUseLargeVarTypes,
+ pythonRunnerConf,
+ pythonMetrics,
+ jobArtifactUUID)
if (isBarrier) {
- child.execute().barrier().mapPartitions(mapper)
+ val rddBarrier = child.execute().barrier()
+ if (conf.usePartitionEvaluator) {
+ rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory)
+ } else {
+ rddBarrier.mapPartitions { iter =>
+ evaluatorFactory.createEvaluator().eval(0, iter)
+ }
+ }
} else {
- child.execute().mapPartitionsInternal(mapper)
+ val inputRdd = child.execute()
+ if (conf.usePartitionEvaluator) {
+ inputRdd.mapPartitionsWithEvaluator(evaluatorFactory)
+ } else {
+ inputRdd.mapPartitionsInternal { iter =>
+ evaluatorFactory.createEvaluator().eval(0, iter)
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]