Repository: spark
Updated Branches:
  refs/heads/master 348d7c9a9 -> 8074208fa


[SPARK-10611] Clone Configuration for each task for NewHadoopRDD

This patch attempts to fix the Hadoop Configuration thread safety issue for 
NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.

Author: Mingyu Kim <[email protected]>

Closes #8763 from mingyukim/mkim/SPARK-10611.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8074208f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8074208f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8074208f

Branch: refs/heads/master
Commit: 8074208fa47fa654c1055c48cfa0d923edeeb04f
Parents: 348d7c9
Author: Mingyu Kim <[email protected]>
Authored: Fri Sep 18 15:40:58 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Fri Sep 18 15:40:58 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/BinaryFileRDD.scala    |  5 +--
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 37 ++++++++++++++++----
 2 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 6fec00d..aedced7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(getConf)
+        configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(getConf, jobId)
+    val jobContext = newJobContext(conf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 174979a..2872b93 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
   override def hashCode(): Int = 41 * (41 + rddId) + index
 }
 
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
 
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
+  private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+
+  def getConf: Configuration = {
+    val conf: Configuration = confBroadcast.value.value
+    if (shouldCloneJobConf) {
+      // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
+      // one job modifies a configuration while another reads it (SPARK-2546, 
SPARK-10611).  This
+      // problem occurs somewhat rarely because most jobs treat the 
configuration as though it's
+      // immutable.  One solution, implemented here, is to clone the 
Configuration object.
+      // Unfortunately, this clone can be very expensive.  To avoid unexpected 
performance
+      // regressions for workloads and Hadoop versions that do not suffer from 
these thread-safety
+      // issues, this cloning is disabled by default.
+      NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+        logDebug("Cloning Hadoop Configuration")
+        new Configuration(conf)
+      }
+    } else {
+      conf
+    }
+  }
+
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
     inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
     val iter = new Iterator[(K, V)] {
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = confBroadcast.value.value
+      val conf = getConf
 
       val inputMetrics = context.taskMetrics
         .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,12 +250,16 @@ class NewHadoopRDD[K, V](
     super.persist(storageLevel)
   }
 
-
-  def getConf: Configuration = confBroadcast.value.value
 }
 
 private[spark] object NewHadoopRDD {
   /**
+   * Configuration's constructor is not threadsafe (see SPARK-1097 and 
HADOOP-10456).
+   * Therefore, we synchronize on this lock before calling new Configuration().
+   */
+  val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+
+  /**
    * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an 
InputSplit to
    * the given function rather than the index of the partition.
    */
@@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD(
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(getConf)
+        configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(getConf, jobId)
+    val jobContext = newJobContext(conf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to