This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d7128c32d64 [SPARK-44375][SQL] Use PartitionEvaluator API in DebugExec
d7128c32d64 is described below
commit d7128c32d645cdd795cae4b4120049007f1e50de
Author: Jia Fan <[email protected]>
AuthorDate: Wed Jul 19 11:52:18 2023 +0800
[SPARK-44375][SQL] Use PartitionEvaluator API in DebugExec
### What changes were proposed in this pull request?
Use PartitionEvaluator API in DebugExec
### Why are the changes needed?
use new api to avoid lambda-based distributed execution
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
add new test
Closes #41949 from Hisoka-X/SPARK-44375_debug_exec.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/debug/DebugEvaluatorFactory.scala | 58 ++++++++++++++++++++
.../apache/spark/sql/execution/debug/package.scala | 63 +++++++++++-----------
2 files changed, 88 insertions(+), 33 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala
new file mode 100644
index 00000000000..f40e998d12f
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.debug
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.util.LongAccumulator
+
+class DebugEvaluatorFactory(
+ tupleCount: LongAccumulator,
+ numColumns: Int,
+ columnAccumulator: Array[SetAccumulator[String]],
+ output: Seq[Attribute]) extends PartitionEvaluatorFactory[InternalRow,
InternalRow] {
+ override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow]
= {
+ new DebugEvaluator()
+ }
+
+ class DebugEvaluator extends PartitionEvaluator[InternalRow, InternalRow] {
+ override def eval(
+ partitionIndex: Int,
+ inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
+ val input = inputs.head
+ new Iterator[InternalRow] {
+ def hasNext: Boolean = input.hasNext
+
+ def next(): InternalRow = {
+ val currentRow = input.next()
+ tupleCount.add(1)
+ var i = 0
+ while (i < numColumns) {
+ val value = currentRow.get(i, output(i).dataType)
+ if (value != null) {
+ columnAccumulator(i).add(value.getClass.getName)
+ }
+ i += 1
+ }
+ currentRow
+ }
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 6f796e6ca94..b4bb6aba15d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -206,25 +206,32 @@ package object debug {
}
}
- case class DebugExec(child: SparkPlan) extends UnaryExecNode with
CodegenSupport {
- def output: Seq[Attribute] = child.output
- class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
- private val _set = Collections.synchronizedSet(new
java.util.HashSet[T]())
- override def isZero: Boolean = _set.isEmpty
- override def copy(): AccumulatorV2[T, java.util.Set[T]] = {
- val newAcc = new SetAccumulator[T]()
- newAcc._set.addAll(_set)
- newAcc
- }
- override def reset(): Unit = _set.clear()
- override def add(v: T): Unit = _set.add(v)
- override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
- _set.addAll(other.value)
- }
- override def value: java.util.Set[T] = _set
+ class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
+ private val _set = Collections.synchronizedSet(new java.util.HashSet[T]())
+
+ override def isZero: Boolean = _set.isEmpty
+
+ override def copy(): AccumulatorV2[T, java.util.Set[T]] = {
+ val newAcc = new SetAccumulator[T]()
+ newAcc._set.addAll(_set)
+ newAcc
+ }
+
+ override def reset(): Unit = _set.clear()
+
+ override def add(v: T): Unit = _set.add(v)
+
+ override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
+ _set.addAll(other.value)
}
+ override def value: java.util.Set[T] = _set
+ }
+
+ case class DebugExec(child: SparkPlan) extends UnaryExecNode with
CodegenSupport {
+ def output: Seq[Attribute] = child.output
+
/**
* A collection of metrics for each column of output.
*/
@@ -250,23 +257,13 @@ package object debug {
}
protected override def doExecute(): RDD[InternalRow] = {
- child.execute().mapPartitions { iter =>
- new Iterator[InternalRow] {
- def hasNext: Boolean = iter.hasNext
-
- def next(): InternalRow = {
- val currentRow = iter.next()
- tupleCount.add(1)
- var i = 0
- while (i < numColumns) {
- val value = currentRow.get(i, output(i).dataType)
- if (value != null) {
- columnStats(i).elementTypes.add(value.getClass.getName)
- }
- i += 1
- }
- currentRow
- }
+ val evaluatorFactory = new DebugEvaluatorFactory(tupleCount, numColumns,
+ columnStats.map(_.elementTypes), output)
+ if (conf.usePartitionEvaluator) {
+ child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
+ } else {
+ child.execute().mapPartitionsWithIndex { (index, iter) =>
+ evaluatorFactory.createEvaluator().eval(index, iter)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]