Repository: spark Updated Branches: refs/heads/master 30c4774f3 -> 26c1089c3
[SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in PartitionStatistics `PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten an iterator of lists, but this is extremely inefficient compared to simply doing `flatMap`/`flatten` because it performs many unnecessary object allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent performance gains when constructing PartitionStatistics instances for tables with many columns. This patch fixes this and also makes two similar changes in MLlib and streaming to try to fix all known occurrences of this pattern. Author: Josh Rosen <[email protected]> Closes #13491 from JoshRosen/foldleft-to-flatmap. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26c1089c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26c1089c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26c1089c Branch: refs/heads/master Commit: 26c1089c37149061f838129bb53330ded68ff4c9 Parents: 30c4774 Author: Josh Rosen <[email protected]> Authored: Sun Jun 5 16:51:00 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Sun Jun 5 16:51:00 2016 -0700 ---------------------------------------------------------------------- mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 2 +- .../org/apache/spark/sql/execution/columnar/ColumnStats.scala | 4 ++-- .../main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/26c1089c/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 94d1b83..8ed40c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -422,7 +422,7 @@ private[ml] object MetaAlgorithmReadWrite { case rformModel: RFormulaModel => Array(rformModel.pipelineModel) case _: Params => Array() } - val subStageMaps = subStages.map(getUidMapImpl).foldLeft(List.empty[(String, Params)])(_ ++ _) + val subStageMaps = subStages.flatMap(getUidMapImpl) List((instance.uid, instance)) ++ subStageMaps } } http://git-wip-us.apache.org/repos/asf/spark/blob/26c1089c/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala index 5d44769..470307b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala @@ -33,9 +33,9 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl } private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable { - val (forAttribute, schema) = { + val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: Seq[AttributeReference]) = { val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a)) - (AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _)) + (AttributeMap(allStats), allStats.flatMap(_._2.schema)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/26c1089c/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index b97e24f..46cd309 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -396,11 +396,11 @@ private[ui] class StreamingPage(parent: StreamingTab) .map(_.ceil.toLong) .getOrElse(0L) - val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map { + val content: Seq[Node] = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap { case (streamId, recordRates) => generateInputDStreamRow( jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated) - }.foldLeft[Seq[Node]](Nil)(_ ++ _) + } // scalastyle:off <table class="table table-bordered" style="width: auto"> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
