Repository: spark Updated Branches: refs/heads/branch-1.4 c15d0f40f -> bdf3dccde
[SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 has already resolved the compatibility issue to support it. But `SparkHadoopMapRedUtil.commitTask` broke it recently. This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix the compatibility issue. Author: zsxwing <[email protected]> Closes #6599 from zsxwing/SPARK-8057 and squashes the following commits: f7a343c [zsxwing] Remove the redundant import 6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdf3dccd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdf3dccd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdf3dccd Branch: refs/heads/branch-1.4 Commit: bdf3dccde090e4b037d2e3623cb91ced6f570725 Parents: c15d0f4 Author: zsxwing <[email protected]> Authored: Thu Aug 6 21:42:42 2015 -0700 Committer: Josh Rosen <[email protected]> Committed: Fri Aug 28 08:31:10 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 14 ++++++++++++++ .../apache/spark/mapred/SparkHadoopMapRedUtil.scala | 3 ++- 2 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bdf3dccd/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 7fa75ac..9574840 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext} +import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID} import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.annotation.DeveloperApi @@ -194,6 +196,18 @@ class SparkHadoopUtil extends Logging { } /** + * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly + * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes + * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+ + * while it's interface in Hadoop 2.+. + */ + def getTaskAttemptIDFromTaskAttemptContext( + context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = { + val method = context.getClass.getMethod("getTaskAttemptID") + method.invoke(context).asInstanceOf[MapReduceTaskAttemptID] + } + + /** * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the * given path points to a file, return a single-element collection containing [[FileStatus]] of * that file. http://git-wip-us.apache.org/repos/asf/spark/blob/bdf3dccd/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 818f7a4..f0eb52e 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext} import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.CommitDeniedException import org.apache.spark.{Logging, SparkEnv, TaskContext} @@ -92,7 +93,7 @@ object SparkHadoopMapRedUtil extends Logging { splitId: Int, attemptId: Int): Unit = { - val mrTaskAttemptID = mrTaskContext.getTaskAttemptID + val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext) // Called after we have decided to commit def performCommit(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
