This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ef3bc41336d7 [SPARK-46512][CORE] Optimize shuffle reading when both
sort and combine are used
ef3bc41336d7 is described below
commit ef3bc41336d7267f80c9ebbfdc6c0406f83c8088
Author: zhengchenyu <[email protected]>
AuthorDate: Sun Feb 4 16:54:59 2024 -0600
[SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are
used
### What changes were proposed in this pull request?
After the shuffle reader obtains the block, it will first perform a combine
operation, and then perform a sort operation. It is known that both combine and
sort may generate temporary files, so the performance may be poor when both
sort and combine are used. In fact, combine operations can be performed during
the sort process, and we can avoid the combine spill file.
See https://issues.apache.org/jira/browse/SPARK-46512 for details
### Why are the changes needed?
Reduce the number of spills to disk when both sort and combine are used
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
spark examples run in local and yarn mode.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44512 from zhengchenyu/SPARK-46512.
Authored-by: zhengchenyu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../spark/shuffle/BlockStoreShuffleReader.scala | 57 +++++++++++++---------
1 file changed, 34 insertions(+), 23 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index c143d9691074..7918d1618eb0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -111,31 +111,42 @@ private[spark] class BlockStoreShuffleReader[K, C](
// An interruptible iterator must be used here in order to support task
cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context,
metricIter)
- val aggregatedIter: Iterator[Product2[K, C]] = if
(dep.aggregator.isDefined) {
- if (dep.mapSideCombine) {
- // We are reading values that are already combined
- val combinedKeyValuesIterator =
interruptibleIter.asInstanceOf[Iterator[(K, C)]]
- dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator,
context)
+ val resultIter: Iterator[Product2[K, C]] = {
+ // Sort the output if there is a sort ordering defined.
+ if (dep.keyOrdering.isDefined) {
+ // Create an ExternalSorter to sort the data.
+ val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+ if (dep.mapSideCombine) {
+ new ExternalSorter[K, C, C](context,
+ Option(new Aggregator[K, C, C](identity,
+ dep.aggregator.get.mergeCombiners,
+ dep.aggregator.get.mergeCombiners)),
+ ordering = Some(dep.keyOrdering.get), serializer =
dep.serializer)
+ } else {
+ new ExternalSorter[K, Nothing, C](context,
+ dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+ ordering = Some(dep.keyOrdering.get), serializer =
dep.serializer)
+ }
+ } else {
+ new ExternalSorter[K, C, C](context, ordering =
Some(dep.keyOrdering.get),
+ serializer = dep.serializer)
+ }
+
sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K,
Nothing)]])
+ } else if (dep.aggregator.isDefined) {
+ if (dep.mapSideCombine) {
+ // We are reading values that are already combined
+ val combinedKeyValuesIterator =
interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+ dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator,
context)
+ } else {
+ // We don't know the value type, but also don't care -- the
dependency *should*
+ // have made sure its compatible w/ this aggregator, which will
convert the value
+ // type to the combined type C
+ val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K,
Nothing)]]
+ dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
+ }
} else {
- // We don't know the value type, but also don't care -- the dependency
*should*
- // have made sure its compatible w/ this aggregator, which will
convert the value
- // type to the combined type C
- val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K,
Nothing)]]
- dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
+ interruptibleIter.asInstanceOf[Iterator[(K, C)]]
}
- } else {
- interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
- }
-
- // Sort the output if there is a sort ordering defined.
- val resultIter: Iterator[Product2[K, C]] = dep.keyOrdering match {
- case Some(keyOrd: Ordering[K]) =>
- // Create an ExternalSorter to sort the data.
- val sorter =
- new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
serializer = dep.serializer)
- sorter.insertAllAndUpdateMetrics(aggregatedIter)
- case None =>
- aggregatedIter
}
resultIter match {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]