Repository: spark Updated Branches: refs/heads/branch-2.0 38a626a54 -> d8370ef11
[SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in PartitionStatistics `PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten an iterator of lists, but this is extremely inefficient compared to simply doing `flatMap`/`flatten` because it performs many unnecessary object allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent performance gains when constructing PartitionStatistics instances for tables with many columns. This patch fixes this and also makes two similar changes in MLlib and streaming to try to fix all known occurrences of this pattern. Author: Josh Rosen <[email protected]> Closes #13491 from JoshRosen/foldleft-to-flatmap. (cherry picked from commit 26c1089c37149061f838129bb53330ded68ff4c9) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8370ef1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8370ef1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8370ef1 Branch: refs/heads/branch-2.0 Commit: d8370ef117c96ebb30e9213d8d89fd3edbd796d7 Parents: 38a626a Author: Josh Rosen <[email protected]> Authored: Sun Jun 5 16:51:00 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Sun Jun 5 16:51:06 2016 -0700 ---------------------------------------------------------------------- mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 2 +- .../org/apache/spark/sql/execution/columnar/ColumnStats.scala | 4 ++-- .../main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8370ef1/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 94d1b83..8ed40c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -422,7 +422,7 @@ private[ml] object MetaAlgorithmReadWrite { case rformModel: RFormulaModel => Array(rformModel.pipelineModel) case _: Params => Array() } - val subStageMaps = subStages.map(getUidMapImpl).foldLeft(List.empty[(String, Params)])(_ ++ _) + val subStageMaps = subStages.flatMap(getUidMapImpl) List((instance.uid, instance)) ++ subStageMaps } } http://git-wip-us.apache.org/repos/asf/spark/blob/d8370ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala index 5d44769..470307b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala @@ -33,9 +33,9 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl } private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable { - val (forAttribute, schema) = { + val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: Seq[AttributeReference]) = { val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a)) - (AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _)) + (AttributeMap(allStats), allStats.flatMap(_._2.schema)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d8370ef1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index b97e24f..46cd309 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -396,11 +396,11 @@ private[ui] class StreamingPage(parent: StreamingTab) .map(_.ceil.toLong) .getOrElse(0L) - val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map { + val content: Seq[Node] = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap { case (streamId, recordRates) => generateInputDStreamRow( jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated) - }.foldLeft[Seq[Node]](Nil)(_ ++ _) + } // scalastyle:off <table class="table table-bordered" style="width: auto"> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
