This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3fb450c [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE
`CustomShuffleReaderExec`
3fb450c is described below
commit 3fb450c0b7c4e415a29d51cd10e6be6ad8dff114
Author: maryannxue <[email protected]>
AuthorDate: Tue Mar 10 14:15:44 2020 +0800
[SPARK-31096][SQL] Replace `Array` with `Seq` in AQE
`CustomShuffleReaderExec`
### What changes were proposed in this pull request?
This PR changes the type of `CustomShuffleReaderExec`'s `partitionSpecs`
from `Array` to `Seq`, since `Array` compares references not values for
equality, which could lead to potential plan reuse problem.
### Why are the changes needed?
Unlike `Seq`, `Array` compares references not values for equality, which
could lead to potential plan reuse problem.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Passes existing UTs.
Closes #27857 from maryannxue/aqe-customreader-fix.
Authored-by: maryannxue <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../adaptive/CustomShuffleReaderExec.scala | 4 +--
.../adaptive/OptimizeLocalShuffleReader.scala | 10 +++---
.../execution/adaptive/OptimizeSkewedJoin.scala | 12 +++-----
.../adaptive/ShufflePartitionsCoalescer.scala | 6 ++--
.../ShufflePartitionsCoalescerSuite.scala | 36 +++++++++++-----------
5 files changed, 33 insertions(+), 35 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index be372bb..ba3f725 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -35,7 +35,7 @@ import
org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExcha
*/
case class CustomShuffleReaderExec private(
child: SparkPlan,
- partitionSpecs: Array[ShufflePartitionSpec],
+ partitionSpecs: Seq[ShufflePartitionSpec],
description: String) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
@@ -71,7 +71,7 @@ case class CustomShuffleReaderExec private(
cachedShuffleRDD = child match {
case stage: ShuffleQueryStageExec =>
new ShuffledRowRDD(
- stage.shuffle.shuffleDependency, stage.shuffle.readMetrics,
partitionSpecs)
+ stage.shuffle.shuffleDependency, stage.shuffle.readMetrics,
partitionSpecs.toArray)
case _ =>
throw new IllegalStateException("operating on canonicalization plan")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index e441763..fb6b40c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -77,21 +77,21 @@ case class OptimizeLocalShuffleReader(conf: SQLConf)
extends Rule[SparkPlan] {
// partition start indices based on block size to avoid data skew.
private def getPartitionSpecs(
shuffleStage: ShuffleQueryStageExec,
- advisoryParallelism: Option[Int]): Array[ShufflePartitionSpec] = {
+ advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = {
val shuffleDep = shuffleStage.shuffle.shuffleDependency
val numReducers = shuffleDep.partitioner.numPartitions
val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
val numMappers = shuffleDep.rdd.getNumPartitions
val splitPoints = if (numMappers == 0) {
- Array.empty
+ Seq.empty
} else {
- equallyDivide(numReducers, math.max(1, expectedParallelism /
numMappers)).toArray
+ equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
}
(0 until numMappers).flatMap { mapIndex =>
(splitPoints :+ numReducers).sliding(2).map {
- case Array(start, end) => PartialMapperPartitionSpec(mapIndex, start,
end)
+ case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start,
end)
}
- }.toArray
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 2e8adcf..c3bcce4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -108,7 +108,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
private def getMapStartIndices(
stage: ShuffleQueryStageExec,
partitionId: Int,
- targetSize: Long): Array[Int] = {
+ targetSize: Long): Seq[Int] = {
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
val partitionStartIndices = ArrayBuffer[Int]()
@@ -126,7 +126,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
i += 1
}
- partitionStartIndices.toArray
+ partitionStartIndices
}
private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics
= {
@@ -255,10 +255,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
logDebug("number of skewed partitions: " +
s"left ${leftSkewDesc.numPartitions}, right
${rightSkewDesc.numPartitions}")
if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
- val newLeft = CustomShuffleReaderExec(
- left, leftSidePartitions.toArray, leftSkewDesc.toString)
- val newRight = CustomShuffleReaderExec(
- right, rightSidePartitions.toArray, rightSkewDesc.toString)
+ val newLeft = CustomShuffleReaderExec(left, leftSidePartitions,
leftSkewDesc.toString)
+ val newRight = CustomShuffleReaderExec(right, rightSidePartitions,
rightSkewDesc.toString)
smj.copy(
left = s1.copy(child = newLeft), right = s2.copy(child = newRight),
isSkewJoin = true)
} else {
@@ -286,7 +284,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
private def createSkewPartitions(
reducerIndex: Int,
- mapStartIndices: Array[Int],
+ mapStartIndices: Seq[Int],
numMappers: Int): Seq[PartialReducerPartitionSpec] = {
mapStartIndices.indices.map { i =>
val startMapIndex = mapStartIndices(i)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
index c3b8bf6..8c58241 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
@@ -47,7 +47,7 @@ object ShufflePartitionsCoalescer extends Logging {
* - coalesced partition 2: shuffle partition 2 (size 170 MiB)
* - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)
*
- * @return An array of [[CoalescedPartitionSpec]]s. For example, if
partitions [0, 1, 2, 3, 4]
+ * @return A sequence of [[CoalescedPartitionSpec]]s. For example, if
partitions [0, 1, 2, 3, 4]
* split at indices [0, 2, 3], the returned partition specs will be:
* CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
* CoalescedPartitionSpec(3, 5).
@@ -57,7 +57,7 @@ object ShufflePartitionsCoalescer extends Logging {
firstPartitionIndex: Int,
lastPartitionIndex: Int,
advisoryTargetSize: Long,
- minNumPartitions: Int = 1): Array[ShufflePartitionSpec] = {
+ minNumPartitions: Int = 1): Seq[ShufflePartitionSpec] = {
// If `minNumPartitions` is very large, it is possible that we need to use
a value less than
// `advisoryTargetSize` as the target size of a coalesced task.
val totalPostShuffleInputSize =
mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
@@ -112,6 +112,6 @@ object ShufflePartitionsCoalescer extends Logging {
}
partitionSpecs += CoalescedPartitionSpec(latestSplitPoint,
lastPartitionIndex)
- partitionSpecs.toArray
+ partitionSpecs
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
index 0befa06..8aab299 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
@@ -24,7 +24,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
private def checkEstimation(
bytesByPartitionIdArray: Array[Array[Long]],
- expectedPartitionStartIndices: Array[CoalescedPartitionSpec],
+ expectedPartitionStartIndices: Seq[CoalescedPartitionSpec],
targetSize: Long,
minNumPartitions: Int = 1): Unit = {
val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
@@ -46,7 +46,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
{
// All bytes per partition are 0.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs,
targetSize)
}
@@ -54,21 +54,21 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// Some bytes per partition are 0 and total size is less than the target
size.
// 1 coalesced partition is expected.
val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs,
targetSize)
}
{
// 2 coalesced partitions are expected.
val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 3),
CoalescedPartitionSpec(3, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 3),
CoalescedPartitionSpec(3, 5))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs,
targetSize)
}
{
// There are a few large shuffle partitions.
val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 3),
@@ -80,7 +80,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
{
// All shuffle partitions are larger than the targeted size.
val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 3),
@@ -92,7 +92,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
{
// The last shuffle partition is in a single coalesced partition.
val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 4),
CoalescedPartitionSpec(4, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 4),
CoalescedPartitionSpec(4, 5))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs,
targetSize)
}
}
@@ -106,7 +106,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0, 0)
intercept[AssertionError] {
- checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2),
Array.empty, targetSize)
+ checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2),
Seq.empty, targetSize)
}
}
@@ -114,7 +114,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// All bytes per partition are 0.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
@@ -126,7 +126,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// 1 coalesced partition is expected.
val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
@@ -137,7 +137,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// 2 coalesced partition are expected.
val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 2),
CoalescedPartitionSpec(2, 4),
CoalescedPartitionSpec(4, 5))
@@ -151,7 +151,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// 4 coalesced partition are expected.
val bytesByPartitionId1 = Array[Long](0, 99, 0, 20, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 4),
@@ -166,7 +166,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// 2 coalesced partition are needed.
val bytesByPartitionId1 = Array[Long](0, 100, 0, 30, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 4),
@@ -181,7 +181,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// There are a few large shuffle partitions.
val bytesByPartitionId1 = Array[Long](0, 100, 40, 30, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 3),
@@ -197,7 +197,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// All pairs of shuffle partitions are larger than the targeted size.
val bytesByPartitionId1 = Array[Long](100, 100, 40, 30, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 3),
@@ -219,7 +219,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// the size of data is 0.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
@@ -230,7 +230,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// The minimal number of coalesced partitions is enforced.
val bytesByPartitionId1 = Array[Long](10, 5, 5, 0, 20)
val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
- val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 3),
CoalescedPartitionSpec(3, 5))
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 3),
CoalescedPartitionSpec(3, 5))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
@@ -241,7 +241,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite
{
// The number of coalesced partitions is determined by the algorithm.
val bytesByPartitionId1 = Array[Long](10, 50, 20, 80, 20)
val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
- val expectedPartitionSpecs = Array(
+ val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 3),
CoalescedPartitionSpec(3, 4),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]