Repository: spark
Updated Branches:
  refs/heads/branch-2.0 38a626a54 -> d8370ef11


[SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in 
PartitionStatistics

`PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten 
an iterator of lists, but this is extremely inefficient compared to simply 
doing `flatMap`/`flatten` because it performs many unnecessary object 
allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent 
performance gains when constructing PartitionStatistics instances for tables 
with many columns.

This patch fixes this and also makes two similar changes in MLlib and streaming 
to try to fix all known occurrences of this pattern.

Author: Josh Rosen <[email protected]>

Closes #13491 from JoshRosen/foldleft-to-flatmap.

(cherry picked from commit 26c1089c37149061f838129bb53330ded68ff4c9)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8370ef1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8370ef1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8370ef1

Branch: refs/heads/branch-2.0
Commit: d8370ef117c96ebb30e9213d8d89fd3edbd796d7
Parents: 38a626a
Author: Josh Rosen <[email protected]>
Authored: Sun Jun 5 16:51:00 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sun Jun 5 16:51:06 2016 -0700

----------------------------------------------------------------------
 mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala    | 2 +-
 .../org/apache/spark/sql/execution/columnar/ColumnStats.scala    | 4 ++--
 .../main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8370ef1/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index 94d1b83..8ed40c3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -422,7 +422,7 @@ private[ml] object MetaAlgorithmReadWrite {
       case rformModel: RFormulaModel => Array(rformModel.pipelineModel)
       case _: Params => Array()
     }
-    val subStageMaps = 
subStages.map(getUidMapImpl).foldLeft(List.empty[(String, Params)])(_ ++ _)
+    val subStageMaps = subStages.flatMap(getUidMapImpl)
     List((instance.uid, instance)) ++ subStageMaps
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d8370ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
index 5d44769..470307b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
@@ -33,9 +33,9 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) 
extends Serializabl
 }
 
 private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) 
extends Serializable {
-  val (forAttribute, schema) = {
+  val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: 
Seq[AttributeReference]) = {
     val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
-    (AttributeMap(allStats), 
allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _))
+    (AttributeMap(allStats), allStats.flatMap(_._2.schema))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d8370ef1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index b97e24f..46cd309 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -396,11 +396,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
       .map(_.ceil.toLong)
       .getOrElse(0L)
 
-    val content = 
listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
+    val content: Seq[Node] = 
listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap {
       case (streamId, recordRates) =>
         generateInputDStreamRow(
           jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
-    }.foldLeft[Seq[Node]](Nil)(_ ++ _)
+    }
 
     // scalastyle:off
     <table class="table table-bordered" style="width: auto">


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to