Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c15d0f40f -> bdf3dccde


[SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection

Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 
has already resolved the compatibility issue to support it. But 
`SparkHadoopMapRedUtil.commitTask` broke it recently.

This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix 
the compatibility issue.

Author: zsxwing <[email protected]>

Closes #6599 from zsxwing/SPARK-8057 and squashes the following commits:

f7a343c [zsxwing] Remove the redundant import
6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdf3dccd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdf3dccd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdf3dccd

Branch: refs/heads/branch-1.4
Commit: bdf3dccde090e4b037d2e3623cb91ced6f570725
Parents: c15d0f4
Author: zsxwing <[email protected]>
Authored: Thu Aug 6 21:42:42 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Fri Aug 28 08:31:10 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkHadoopUtil.scala     | 14 ++++++++++++++
 .../apache/spark/mapred/SparkHadoopMapRedUtil.scala   |  3 ++-
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bdf3dccd/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 7fa75ac..9574840 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, 
PathFilter}
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.{TaskAttemptContext => 
MapReduceTaskAttemptContext}
+import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.annotation.DeveloperApi
@@ -194,6 +196,18 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If 
we directly
+   * call `TaskAttemptContext.getTaskAttemptID`, it will generate different 
byte codes
+   * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in 
Hadoop 1.+
+   * while it's interface in Hadoop 2.+.
+   */
+  def getTaskAttemptIDFromTaskAttemptContext(
+      context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+    val method = context.getClass.getMethod("getTaskAttemptID")
+    method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
+  }
+
+  /**
    * Get [[FileStatus]] objects for all leaf children (files) under the given 
base path. If the
    * given path points to a file, return a single-element collection 
containing [[FileStatus]] of
    * that file.

http://git-wip-us.apache.org/repos/asf/spark/blob/bdf3dccd/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 818f7a4..f0eb52e 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.{TaskAttemptContext => 
MapReduceTaskAttemptContext}
 import org.apache.hadoop.mapreduce.{OutputCommitter => 
MapReduceOutputCommitter}
 
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.{Logging, SparkEnv, TaskContext}
 
@@ -92,7 +93,7 @@ object SparkHadoopMapRedUtil extends Logging {
       splitId: Int,
       attemptId: Int): Unit = {
 
-    val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
+    val mrTaskAttemptID = 
SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
 
     // Called after we have decided to commit
     def performCommit(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to