Repository: spark
Updated Branches:
  refs/heads/branch-1.4 3b4f9ce85 -> 2904d3f8b


[SPARK-7766] KryoSerializerInstance reuse is unsafe when auto-reset is disabled

SPARK-3386 / #5606 modified the shuffle write path to re-use serializer 
instances across multiple calls to DiskBlockObjectWriter. It turns out that 
this introduced a very rare bug when using `KryoSerializer`: if auto-reset is 
disabled and reference-tracking is enabled, then we'll end up re-using the same 
serializer instance to write multiple output streams without calling `reset()` 
between write calls, which can lead to cases where objects in one file may 
contain references to objects that are in previous files, causing errors during 
deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` 
and `serializeStream()`. I also added a regression test which demonstrates that 
this problem only occurs when auto-reset is disabled and reference-tracking is 
enabled.

Author: Josh Rosen <[email protected]>

Closes #6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following 
commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug

(cherry picked from commit eac00691da93a94e6cff5ae0f8952e5724e78094)
Signed-off-by: Josh Rosen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2904d3f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2904d3f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2904d3f8

Branch: refs/heads/branch-1.4
Commit: 2904d3f8bddad65ebdf0d91be6110f81f35aeb0f
Parents: 3b4f9ce
Author: Josh Rosen <[email protected]>
Authored: Fri May 22 13:28:14 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Fri May 22 13:29:02 2015 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       |  2 ++
 .../spark/serializer/KryoSerializerSuite.scala  | 33 ++++++++++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2904d3f8/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 64ba27f..2179579 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     output.clear()
+    kryo.reset() // We must reset in case this serializer instance was reused 
(see SPARK-7766)
     try {
       kryo.writeClassAndObject(output, t)
     } catch {
@@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
   }
 
   override def serializeStream(s: OutputStream): SerializationStream = {
+    kryo.reset() // We must reset in case this serializer instance was reused 
(see SPARK-7766)
     new KryoSerializationStream(kryo, s)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2904d3f8/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c7369de..0bd91a8 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.serializer
 
+import java.io.ByteArrayOutputStream
+
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
@@ -319,6 +321,37 @@ class KryoSerializerSuite extends FunSuite with 
SharedSparkContext {
     val ser2 = new 
KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
     assert(!ser2.getAutoReset)
   }
+
+  private def testSerializerInstanceReuse(autoReset: Boolean, 
referenceTracking: Boolean): Unit = {
+    val conf = new SparkConf(loadDefaults = false)
+      .set("spark.kryo.referenceTracking", referenceTracking.toString)
+    if (!autoReset) {
+      conf.set("spark.kryo.registrator", 
classOf[RegistratorWithoutAutoReset].getName)
+    }
+    val ser = new KryoSerializer(conf)
+    val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
+    assert (serInstance.getAutoReset() === autoReset)
+    val obj = ("Hello", "World")
+    def serializeObjects(): Array[Byte] = {
+      val baos = new ByteArrayOutputStream()
+      val serStream = serInstance.serializeStream(baos)
+      serStream.writeObject(obj)
+      serStream.writeObject(obj)
+      serStream.close()
+      baos.toByteArray
+    }
+    val output1: Array[Byte] = serializeObjects()
+    val output2: Array[Byte] = serializeObjects()
+    assert (output1 === output2)
+  }
+
+  // Regression test for SPARK-7766, an issue where disabling auto-reset and 
enabling
+  // reference-tracking would lead to corrupted output when serializer 
instances are re-used
+  for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
+    test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking") {
+      testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = 
referenceTracking)
+    }
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to