Repository: spark
Updated Branches:
  refs/heads/master 05bf4e4af -> 066765d60


SPARK-2685. Update ExternalAppendOnlyMap to avoid buffer.remove()

Replaces this with an O(1) operation that does not have to shift over
the whole tail of the array into the gap produced by the element removed.

Author: Matei Zaharia <[email protected]>

Closes #1773 from mateiz/SPARK-2685 and squashes the following commits:

1ea028a [Matei Zaharia] Update comments in StreamBuffer and EAOM, and reuse 
ArrayBuffers
eb1abfd [Matei Zaharia] Update ExternalAppendOnlyMap to avoid buffer.remove()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/066765d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/066765d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/066765d6

Branch: refs/heads/master
Commit: 066765d60d21b6b9943862b788e4a4bd07396e6c
Parents: 05bf4e4
Author: Matei Zaharia <[email protected]>
Authored: Mon Aug 4 23:27:53 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Mon Aug 4 23:27:53 2014 -0700

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 50 ++++++++++++++------
 1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/066765d6/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 5d10a1f..1f7d2dc 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -286,30 +286,32 @@ class ExternalAppendOnlyMap[K, V, C](
     private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => 
it.buffered)
 
     inputStreams.foreach { it =>
-      val kcPairs = getMorePairs(it)
+      val kcPairs = new ArrayBuffer[(K, C)]
+      readNextHashCode(it, kcPairs)
       if (kcPairs.length > 0) {
         mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
       }
     }
 
     /**
-     * Fetch from the given iterator until a key of different hash is 
retrieved.
+     * Fill a buffer with the next set of keys with the same hash code from a 
given iterator. We
+     * read streams one hash code at a time to ensure we don't miss elements 
when they are merged.
+     *
+     * Assumes the given iterator is in sorted order of hash code.
      *
-     * In the event of key hash collisions, this ensures no pairs are hidden 
from being merged.
-     * Assume the given iterator is in sorted order.
+     * @param it iterator to read from
+     * @param buf buffer to write the results into
      */
-    private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, 
C)] = {
-      val kcPairs = new ArrayBuffer[(K, C)]
+    private def readNextHashCode(it: BufferedIterator[(K, C)], buf: 
ArrayBuffer[(K, C)]): Unit = {
       if (it.hasNext) {
         var kc = it.next()
-        kcPairs += kc
+        buf += kc
         val minHash = hashKey(kc)
         while (it.hasNext && it.head._1.hashCode() == minHash) {
           kc = it.next()
-          kcPairs += kc
+          buf += kc
         }
       }
-      kcPairs
     }
 
     /**
@@ -321,7 +323,9 @@ class ExternalAppendOnlyMap[K, V, C](
       while (i < buffer.pairs.length) {
         val pair = buffer.pairs(i)
         if (pair._1 == key) {
-          buffer.pairs.remove(i)
+          // Note that there's at most one pair in the buffer with a given 
key, since we always
+          // merge stuff in a map before spilling, so it's safe to return 
after the first we find
+          removeFromBuffer(buffer.pairs, i)
           return mergeCombiners(baseCombiner, pair._2)
         }
         i += 1
@@ -330,6 +334,19 @@ class ExternalAppendOnlyMap[K, V, C](
     }
 
     /**
+     * Remove the index'th element from an ArrayBuffer in constant time, 
swapping another element
+     * into its place. This is more efficient than the ArrayBuffer.remove 
method because it does
+     * not have to shift all the elements in the array over. It works for our 
array buffers because
+     * we don't care about the order of elements inside, we just want to 
search them for a key.
+     */
+    private def removeFromBuffer[T](buffer: ArrayBuffer[T], index: Int): T = {
+      val elem = buffer(index)
+      buffer(index) = buffer(buffer.size - 1)  // This also works if index == 
buffer.size - 1
+      buffer.reduceToSize(buffer.size - 1)
+      elem
+    }
+
+    /**
      * Return true if there exists an input stream that still has unvisited 
pairs.
      */
     override def hasNext: Boolean = mergeHeap.length > 0
@@ -346,7 +363,7 @@ class ExternalAppendOnlyMap[K, V, C](
       val minBuffer = mergeHeap.dequeue()
       val minPairs = minBuffer.pairs
       val minHash = minBuffer.minKeyHash
-      val minPair = minPairs.remove(0)
+      val minPair = removeFromBuffer(minPairs, 0)
       val minKey = minPair._1
       var minCombiner = minPair._2
       assert(hashKey(minPair) == minHash)
@@ -363,7 +380,7 @@ class ExternalAppendOnlyMap[K, V, C](
       // Repopulate each visited stream buffer and add it back to the queue if 
it is non-empty
       mergedBuffers.foreach { buffer =>
         if (buffer.isEmpty) {
-          buffer.pairs ++= getMorePairs(buffer.iterator)
+          readNextHashCode(buffer.iterator, buffer.pairs)
         }
         if (!buffer.isEmpty) {
           mergeHeap.enqueue(buffer)
@@ -375,10 +392,13 @@ class ExternalAppendOnlyMap[K, V, C](
 
     /**
      * A buffer for streaming from a map iterator (in-memory or on-disk) 
sorted by key hash.
-     * Each buffer maintains the lowest-ordered keys in the corresponding 
iterator. Due to
-     * hash collisions, it is possible for multiple keys to be "tied" for 
being the lowest.
+     * Each buffer maintains all of the key-value pairs with what is currently 
the lowest hash
+     * code among keys in the stream. There may be multiple keys if there are 
hash collisions.
+     * Note that because when we spill data out, we only spill one value for 
each key, there is
+     * at most one element for each key.
      *
-     * StreamBuffers are ordered by the minimum key hash found across all of 
their own pairs.
+     * StreamBuffers are ordered by the minimum key hash currently available 
in their stream so
+     * that we can put them into a heap and sort that.
      */
     private class StreamBuffer(
         val iterator: BufferedIterator[(K, C)],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to