Repository: spark
Updated Branches:
  refs/heads/master 7104ee0e5 -> 2ea0f2e11


[SPARK-9585] Delete the input format caching because some input format are non 
thread safe

If we cache the  InputFormat, all tasks on the same executor will share it.
Some InputFormat is thread safety, but some are not, such as 
HiveHBaseTableInputFormat. If tasks share a non thread safe InputFormat, 
unexpected error may be occurs.
To avoid it, I think we should delete the input format  caching.

Author: xutingjun <[email protected]>
Author: meiyoula <[email protected]>
Author: Xutingjun <[email protected]>

Closes #7918 from XuTingjun/cached_inputFormat.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ea0f2e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ea0f2e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ea0f2e1

Branch: refs/heads/master
Commit: 2ea0f2e11b82ef4817c7e6a162ea23da7860b893
Parents: 7104ee0
Author: xutingjun <[email protected]>
Authored: Tue Sep 22 11:01:32 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Tue Sep 22 11:01:32 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 6 ------
 1 file changed, 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ea0f2e1/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 8f2655d..77b5713 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -182,17 +182,11 @@ class HadoopRDD[K, V](
   }
 
   protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
-    if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
-      return 
HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
-    }
-    // Once an InputFormat for this RDD is created, cache it so that only one 
reflection call is
-    // done in each local process.
     val newInputFormat = 
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
       .asInstanceOf[InputFormat[K, V]]
     if (newInputFormat.isInstanceOf[Configurable]) {
       newInputFormat.asInstanceOf[Configurable].setConf(conf)
     }
-    HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
     newInputFormat
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to