Repository: spark Updated Branches: refs/heads/master 9f5c77ae3 -> 9f02d7dc5
[SPARK-22385][SQL] MapObjects should not access list element by index ## What changes were proposed in this pull request? This issue was discovered and investigated by Ohad Raviv and Sean Owen in https://issues.apache.org/jira/browse/SPARK-21657. The input data of `MapObjects` may be a `List` which has O(n) complexity for accessing by index. When converting input data to catalyst array, `MapObjects` gets element by index in each loop, and results to bad performance. This PR fixes this issue by accessing elements via Iterator. ## How was this patch tested? using the test script in https://issues.apache.org/jira/browse/SPARK-21657 ``` val BASE = 100000000 val N = 100000 val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr") spark.time(df.queryExecution.toRdd.foreach(_ => ())) ``` We can see 50x speed up. Author: Wenchen Fan <[email protected]> Closes #19603 from cloud-fan/map-objects. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f02d7dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f02d7dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f02d7dc Branch: refs/heads/master Commit: 9f02d7dc537b73988468b11337dbb14a8602f246 Parents: 9f5c77a Author: Wenchen Fan <[email protected]> Authored: Mon Oct 30 11:00:44 2017 +0100 Committer: Wenchen Fan <[email protected]> Committed: Mon Oct 30 11:00:44 2017 +0100 ---------------------------------------------------------------------- .../catalyst/expressions/objects/objects.scala | 40 ++++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9f02d7dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 9b28a18..6ae3490 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -591,18 +591,43 @@ case class MapObjects private( case _ => inputData.dataType } - val (getLength, getLoopVar) = inputDataType match { + // `MapObjects` generates a while loop to traverse the elements of the input collection. We + // need to take care of Seq and List because they may have O(n) complexity for indexed accessing + // like `list.get(1)`. Here we use Iterator to traverse Seq and List. + val (getLength, prepareLoop, getLoopVar) = inputDataType match { case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) => - s"${genInputData.value}.size()" -> s"${genInputData.value}.apply($loopIndex)" + val it = ctx.freshName("it") + ( + s"${genInputData.value}.size()", + s"scala.collection.Iterator $it = ${genInputData.value}.toIterator();", + s"$it.next()" + ) case ObjectType(cls) if cls.isArray => - s"${genInputData.value}.length" -> s"${genInputData.value}[$loopIndex]" + ( + s"${genInputData.value}.length", + "", + s"${genInputData.value}[$loopIndex]" + ) case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => - s"${genInputData.value}.size()" -> s"${genInputData.value}.get($loopIndex)" + val it = ctx.freshName("it") + ( + s"${genInputData.value}.size()", + s"java.util.Iterator $it = ${genInputData.value}.iterator();", + s"$it.next()" + ) case ArrayType(et, _) => - s"${genInputData.value}.numElements()" -> ctx.getValue(genInputData.value, et, loopIndex) + ( + s"${genInputData.value}.numElements()", + "", + ctx.getValue(genInputData.value, et, loopIndex) + ) case ObjectType(cls) if cls == classOf[Object] => - s"$seq == null ? $array.length : $seq.size()" -> - s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)" + val it = ctx.freshName("it") + ( + s"$seq == null ? $array.length : $seq.size()", + s"scala.collection.Iterator $it = $seq == null ? null : $seq.toIterator();", + s"$it == null ? $array[$loopIndex] : $it.next()" + ) } // Make a copy of the data if it's unsafe-backed @@ -676,6 +701,7 @@ case class MapObjects private( $initCollection int $loopIndex = 0; + $prepareLoop while ($loopIndex < $dataLength) { $loopValue = ($elementJavaType) ($getLoopVar); $loopNullCheck --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
