Repository: spark
Updated Branches:
  refs/heads/master 59f84a953 -> 8e7d5ba1a


SPARK-2792. Fix reading too much or too little data from each stream in 
ExternalMap / Sorter

All these changes are from mridulm's work in #1609, but extracted here to fix 
this specific issue and make it easier to merge not 1.1. This particular set of 
changes is to make sure that we read exactly the right range of bytes from each 
spill file in EAOM: some serializers can write bytes after the last object 
(e.g. the TC_RESET flag in Java serialization) and that would confuse the 
previous code into reading it as part of the next batch. There are also 
improvements to cleanup to make sure files are closed.

In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied 
them to the corresponding code in ExternalSorter and updated its test suite to 
test for the same issues.

Author: Matei Zaharia <[email protected]>

Closes #1722 from mateiz/spark-2792 and squashes the following commits:

5d4bfb5 [Matei Zaharia] Make objectStreamReset counter count the last object 
written too
18fe865 [Matei Zaharia] Update docs on objectStreamReset
576ee83 [Matei Zaharia] Allow objectStreamReset to be 0
0374217 [Matei Zaharia] Remove super paranoid code to close file handles
bda37bb [Matei Zaharia] Implement Mridul's ExternalAppendOnlyMap fixes in 
ExternalSorter too
0d6dad7 [Matei Zaharia] Added Mridul's test changes for ExternalAppendOnlyMap
9a78e4b [Matei Zaharia] Add @mridulm's fixes to ExternalAppendOnlyMap for batch 
sizes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e7d5ba1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e7d5ba1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e7d5ba1

Branch: refs/heads/master
Commit: 8e7d5ba1a20a8a1f409e9d6472ae3e6c4bc948b4
Parents: 59f84a9
Author: Matei Zaharia <[email protected]>
Authored: Mon Aug 4 12:59:18 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Mon Aug 4 12:59:18 2014 -0700

----------------------------------------------------------------------
 .../spark/serializer/JavaSerializer.scala       |   5 +-
 .../util/collection/ExternalAppendOnlyMap.scala |  86 +++++++++++----
 .../spark/util/collection/ExternalSorter.scala  | 104 +++++++++++++------
 .../collection/ExternalAppendOnlyMapSuite.scala |  33 ++++--
 .../util/collection/ExternalSorterSuite.scala   |  47 +++++----
 docs/configuration.md                           |   2 +-
 6 files changed, 194 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e7d5ba1/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index a7fa057..34bc312 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: 
OutputStream, counterReset: In
   /**
    * Calling reset to avoid memory leak:
    * 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
-   * But only call it every 10,000th time to avoid bloated serialization 
streams (when
+   * But only call it every 100th time to avoid bloated serialization streams 
(when
    * the stream 'resets' object class descriptions have to be re-written)
    */
   def writeObject[T: ClassTag](t: T): SerializationStream = {
     objOut.writeObject(t)
+    counter += 1
     if (counterReset > 0 && counter >= counterReset) {
       objOut.reset()
       counter = 0
-    } else {
-      counter += 1
     }
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7d5ba1/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index cb67a1c..5d10a1f 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import java.io.{InputStream, BufferedInputStream, FileInputStream, File, 
Serializable, EOFException}
+import java.io._
 import java.util.Comparator
 
 import scala.collection.BufferedIterator
@@ -28,7 +28,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{DeserializationStream, Serializer}
 import org.apache.spark.storage.{BlockId, BlockManager}
 import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
 
@@ -199,13 +199,16 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Flush the disk writer's contents to disk, and update relevant variables
     def flush() = {
-      writer.commitAndClose()
-      val bytesWritten = writer.bytesWritten
+      val w = writer
+      writer = null
+      w.commitAndClose()
+      val bytesWritten = w.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten
       objectsWritten = 0
     }
 
+    var success = false
     try {
       val it = currentMap.destructiveSortedIterator(keyComparator)
       while (it.hasNext) {
@@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
 
         if (objectsWritten == serializerBatchSize) {
           flush()
-          writer.close()
           writer = blockManager.getDiskWriter(blockId, file, serializer, 
fileBufferSize)
         }
       }
       if (objectsWritten > 0) {
         flush()
+      } else if (writer != null) {
+        val w = writer
+        writer = null
+        w.revertPartialWritesAndClose()
       }
+      success = true
     } finally {
-      // Partial failures cannot be tolerated; do not revert partial writes
-      writer.close()
+      if (!success) {
+        // This code path only happens if an exception was thrown above before 
we set success;
+        // close our stuff and let the exception be thrown further
+        if (writer != null) {
+          writer.revertPartialWritesAndClose()
+        }
+        if (file.exists()) {
+          file.delete()
+        }
+      }
     }
 
     currentMap = new SizeTrackingAppendOnlyMap[K, C]
@@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
    * An iterator that returns (K, C) pairs in sorted order from an on-disk map
    */
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
-    extends Iterator[(K, C)] {
-    private val fileStream = new FileInputStream(file)
-    private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+    extends Iterator[(K, C)]
+  {
+    private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)  // Size will be 
batchSize.length + 1
+    assert(file.length() == batchOffsets(batchOffsets.length - 1))
+
+    private var batchIndex = 0  // Which batch we're in
+    private var fileStream: FileInputStream = null
 
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
-    private var batchStream = nextBatchStream()
-    private var compressedStream = blockManager.wrapForCompression(blockId, 
batchStream)
-    private var deserializeStream = ser.deserializeStream(compressedStream)
+    private var deserializeStream = nextBatchStream()
     private var nextItem: (K, C) = null
     private var objectsRead = 0
 
     /**
      * Construct a stream that reads only from the next batch.
      */
-    private def nextBatchStream(): InputStream = {
-      if (batchSizes.length > 0) {
-        ByteStreams.limit(bufferedStream, batchSizes.remove(0))
+    private def nextBatchStream(): DeserializationStream = {
+      // Note that batchOffsets.length = numBatches + 1 since we did a scan 
above; check whether
+      // we're still in a valid batch.
+      if (batchIndex < batchOffsets.length - 1) {
+        if (deserializeStream != null) {
+          deserializeStream.close()
+          fileStream.close()
+          deserializeStream = null
+          fileStream = null
+        }
+
+        val start = batchOffsets(batchIndex)
+        fileStream = new FileInputStream(file)
+        fileStream.getChannel.position(start)
+        batchIndex += 1
+
+        val end = batchOffsets(batchIndex)
+
+        assert(end >= start, "start = " + start + ", end = " + end +
+          ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
+
+        val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
+        val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
+        ser.deserializeStream(compressedStream)
       } else {
         // No more batches left
-        bufferedStream
+        cleanup()
+        null
       }
     }
 
@@ -424,10 +463,8 @@ class ExternalAppendOnlyMap[K, V, C](
         val item = deserializeStream.readObject().asInstanceOf[(K, C)]
         objectsRead += 1
         if (objectsRead == serializerBatchSize) {
-          batchStream = nextBatchStream()
-          compressedStream = blockManager.wrapForCompression(blockId, 
batchStream)
-          deserializeStream = ser.deserializeStream(compressedStream)
           objectsRead = 0
+          deserializeStream = nextBatchStream()
         }
         item
       } catch {
@@ -439,6 +476,9 @@ class ExternalAppendOnlyMap[K, V, C](
 
     override def hasNext: Boolean = {
       if (nextItem == null) {
+        if (deserializeStream == null) {
+          return false
+        }
         nextItem = readNextItem()
       }
       nextItem != null
@@ -455,7 +495,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // TODO: Ensure this gets called even if the iterator isn't drained.
     private def cleanup() {
-      deserializeStream.close()
+      batchIndex = batchOffsets.length  // Prevent reading any other batch
+      val ds = deserializeStream
+      deserializeStream = null
+      fileStream = null
+      ds.close()
       file.delete()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7d5ba1/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 6e415a2..b04c50b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import com.google.common.io.ByteStreams
 
 import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{DeserializationStream, Serializer}
 import org.apache.spark.storage.BlockId
 
 /**
@@ -273,13 +273,16 @@ private[spark] class ExternalSorter[K, V, C](
     // Flush the disk writer's contents to disk, and update relevant variables.
     // The writer is closed at the end of this process, and cannot be reused.
     def flush() = {
-      writer.commitAndClose()
-      val bytesWritten = writer.bytesWritten
+      val w = writer
+      writer = null
+      w.commitAndClose()
+      val bytesWritten = w.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten
       objectsWritten = 0
     }
 
+    var success = false
     try {
       val it = collection.destructiveSortedIterator(partitionKeyComparator)
       while (it.hasNext) {
@@ -299,13 +302,23 @@ private[spark] class ExternalSorter[K, V, C](
       }
       if (objectsWritten > 0) {
         flush()
+      } else if (writer != null) {
+        val w = writer
+        writer = null
+        w.revertPartialWritesAndClose()
+      }
+      success = true
+    } finally {
+      if (!success) {
+        // This code path only happens if an exception was thrown above before 
we set success;
+        // close our stuff and let the exception be thrown further
+        if (writer != null) {
+          writer.revertPartialWritesAndClose()
+        }
+        if (file.exists()) {
+          file.delete()
+        }
       }
-      writer.close()
-    } catch {
-      case e: Exception =>
-        writer.close()
-        file.delete()
-        throw e
     }
 
     if (usingMap) {
@@ -472,36 +485,58 @@ private[spark] class ExternalSorter[K, V, C](
    * partitions to be requested in order.
    */
   private[this] class SpillReader(spill: SpilledFile) {
-    val fileStream = new FileInputStream(spill.file)
-    val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
+    // Serializer batch offsets; size will be batchSize.length + 1
+    val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)
 
     // Track which partition and which batch stream we're in. These will be 
the indices of
     // the next element we will read. We'll also store the last partition read 
so that
     // readNextPartition() can figure out what partition that was from.
     var partitionId = 0
     var indexInPartition = 0L
-    var batchStreamsRead = 0
+    var batchId = 0
     var indexInBatch = 0
     var lastPartitionId = 0
 
     skipToNextPartition()
 
-    // An intermediate stream that reads from exactly one batch
+
+    // Intermediate file and deserializer streams that read from exactly one 
batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
-    var batchStream = nextBatchStream()
-    var compressedStream = blockManager.wrapForCompression(spill.blockId, 
batchStream)
-    var deserStream = serInstance.deserializeStream(compressedStream)
+    var fileStream: FileInputStream = null
+    var deserializeStream = nextBatchStream()  // Also sets fileStream
+
     var nextItem: (K, C) = null
     var finished = false
 
     /** Construct a stream that only reads from the next batch */
-    def nextBatchStream(): InputStream = {
-      if (batchStreamsRead < spill.serializerBatchSizes.length) {
-        batchStreamsRead += 1
-        ByteStreams.limit(bufferedStream, 
spill.serializerBatchSizes(batchStreamsRead - 1))
+    def nextBatchStream(): DeserializationStream = {
+      // Note that batchOffsets.length = numBatches + 1 since we did a scan 
above; check whether
+      // we're still in a valid batch.
+      if (batchId < batchOffsets.length - 1) {
+        if (deserializeStream != null) {
+          deserializeStream.close()
+          fileStream.close()
+          deserializeStream = null
+          fileStream = null
+        }
+
+        val start = batchOffsets(batchId)
+        fileStream = new FileInputStream(spill.file)
+        fileStream.getChannel.position(start)
+        batchId += 1
+
+        val end = batchOffsets(batchId)
+
+        assert(end >= start, "start = " + start + ", end = " + end +
+          ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
+
+        val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
+        val compressedStream = blockManager.wrapForCompression(spill.blockId, 
bufferedStream)
+        serInstance.deserializeStream(compressedStream)
       } else {
-        // No more batches left; give an empty stream
-        bufferedStream
+        // No more batches left
+        cleanup()
+        null
       }
     }
 
@@ -525,19 +560,17 @@ private[spark] class ExternalSorter[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      if (finished) {
+      if (finished || deserializeStream == null) {
         return null
       }
-      val k = deserStream.readObject().asInstanceOf[K]
-      val c = deserStream.readObject().asInstanceOf[C]
+      val k = deserializeStream.readObject().asInstanceOf[K]
+      val c = deserializeStream.readObject().asInstanceOf[C]
       lastPartitionId = partitionId
       // Start reading the next batch if we're done with this one
       indexInBatch += 1
       if (indexInBatch == serializerBatchSize) {
-        batchStream = nextBatchStream()
-        compressedStream = blockManager.wrapForCompression(spill.blockId, 
batchStream)
-        deserStream = serInstance.deserializeStream(compressedStream)
         indexInBatch = 0
+        deserializeStream = nextBatchStream()
       }
       // Update the partition location of the element we're reading
       indexInPartition += 1
@@ -545,7 +578,9 @@ private[spark] class ExternalSorter[K, V, C](
       // If we've finished reading the last partition, remember that we're done
       if (partitionId == numPartitions) {
         finished = true
-        deserStream.close()
+        if (deserializeStream != null) {
+          deserializeStream.close()
+        }
       }
       (k, c)
     }
@@ -578,6 +613,17 @@ private[spark] class ExternalSorter[K, V, C](
         item
       }
     }
+
+    // Clean up our open streams and put us in a state where we can't read any 
more data
+    def cleanup() {
+      batchId = batchOffsets.length  // Prevent reading any other batch
+      val ds = deserializeStream
+      deserializeStream = null
+      fileStream = null
+      ds.close()
+      // NOTE: We don't do file.delete() here because that is done in 
ExternalSorter.stop().
+      // This should also be fixed in ExternalAppendOnlyMap.
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7d5ba1/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 7de5df6..04d7338 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
   private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = 
buf1 ++= buf2
 
+  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+    val conf = new SparkConf(loadDefaults)
+    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+    conf.set("spark.serializer.objectStreamReset", "1")
+    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    // Ensure that we actually have multiple batches per spill file
+    conf.set("spark.shuffle.spill.batchSize", "10")
+    conf
+  }
+
   test("simple insert") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     sc = new SparkContext("local", "test", conf)
 
     val map = new ExternalAppendOnlyMap[Int, Int, 
ArrayBuffer[Int]](createCombiner,
@@ -57,7 +68,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("insert with collision") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     sc = new SparkContext("local", "test", conf)
 
     val map = new ExternalAppendOnlyMap[Int, Int, 
ArrayBuffer[Int]](createCombiner,
@@ -80,7 +91,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("ordering") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     sc = new SparkContext("local", "test", conf)
 
     val map1 = new ExternalAppendOnlyMap[Int, Int, 
ArrayBuffer[Int]](createCombiner,
@@ -125,7 +136,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("null keys and values") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     sc = new SparkContext("local", "test", conf)
 
     val map = new ExternalAppendOnlyMap[Int, Int, 
ArrayBuffer[Int]](createCombiner,
@@ -166,7 +177,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("simple aggregator") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     sc = new SparkContext("local", "test", conf)
 
     // reduceByKey
@@ -181,7 +192,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("simple cogroup") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     sc = new SparkContext("local", "test", conf)
     val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
     val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
@@ -199,7 +210,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling") {
-    val conf = new SparkConf(true)  // Load defaults, otherwise SPARK_HOME is 
not found
+    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME 
is not found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -249,7 +260,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with hash collisions") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -304,7 +315,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with many hash collisions") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.0001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -329,7 +340,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with hash collisions using the Int.MaxValue key") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -347,7 +358,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with null keys and values") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7d5ba1/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 65a71e5..57dcb4f 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -25,6 +25,17 @@ import org.apache.spark._
 import org.apache.spark.SparkContext._
 
 class ExternalSorterSuite extends FunSuite with LocalSparkContext {
+  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+    val conf = new SparkConf(loadDefaults)
+    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+    conf.set("spark.serializer.objectStreamReset", "1")
+    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    // Ensure that we actually have multiple batches per spill file
+    conf.set("spark.shuffle.spill.batchSize", "10")
+    conf
+  }
+
   test("empty data stream") {
     val conf = new SparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
@@ -60,7 +71,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("few elements per partition") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -102,7 +113,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("empty partitions with spilling") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -127,7 +138,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling in local cluster") {
-    val conf = new SparkConf(true)  // Load defaults, otherwise SPARK_HOME is 
not found
+    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME 
is not found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -198,7 +209,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling in local cluster with many reduce tasks") {
-    val conf = new SparkConf(true)  // Load defaults, otherwise SPARK_HOME is 
not found
+    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME 
is not found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
@@ -269,7 +280,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("cleanup of intermediate files in sorter") {
-    val conf = new SparkConf(true)  // Load defaults, otherwise SPARK_HOME is 
not found
+    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME 
is not found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -290,7 +301,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("cleanup of intermediate files in sorter if there are errors") {
-    val conf = new SparkConf(true)  // Load defaults, otherwise SPARK_HOME is 
not found
+    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME 
is not found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -311,7 +322,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("cleanup of intermediate files in shuffle") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -326,7 +337,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("cleanup of intermediate files in shuffle with errors") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -348,7 +359,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("no partial aggregation or sorting") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -363,7 +374,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("partial aggregation without spill") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -379,7 +390,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("partial aggregation with spill, no ordering") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -395,7 +406,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("partial aggregation with spill, with ordering") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -412,7 +423,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("sorting without aggregation, no spill") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -429,7 +440,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("sorting without aggregation, with spill") {
-    val conf = new SparkConf(false)
+    val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -446,7 +457,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with hash collisions") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -503,7 +514,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with many hash collisions") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.0001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -526,7 +537,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with hash collisions using the Int.MaxValue key") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -547,7 +558,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("spilling with null keys and values") {
-    val conf = new SparkConf(true)
+    val conf = createSparkConf(true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7d5ba1/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 2a71d7b..870343f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -385,7 +385,7 @@ Apart from these, the following properties are also 
available, and may be useful
     When serializing using org.apache.spark.serializer.JavaSerializer, the 
serializer caches
     objects to prevent writing redundant data, however that stops garbage 
collection of those
     objects. By calling 'reset' you flush that info from the serializer, and 
allow old
-    objects to be collected. To turn off this periodic reset set it to a value 
&lt;= 0.
+    objects to be collected. To turn off this periodic reset set it to -1.
     By default it will reset the serializer every 100 objects.
   </td>
 </tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to