Repository: spark Updated Branches: refs/heads/master fe12277b4 -> 672f46766
[SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 has already resolved the compatibility issue to support it. But `SparkHadoopMapRedUtil.commitTask` broke it recently. This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix the compatibility issue. Author: zsxwing <[email protected]> Closes #6599 from zsxwing/SPARK-8057 and squashes the following commits: f7a343c [zsxwing] Remove the redundant import 6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/672f4676 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/672f4676 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/672f4676 Branch: refs/heads/master Commit: 672f467668da1cf20895ee57652489c306120288 Parents: fe12277 Author: zsxwing <[email protected]> Authored: Thu Aug 6 21:42:42 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Thu Aug 6 21:42:42 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 14 ++++++++++++++ .../apache/spark/mapred/SparkHadoopMapRedUtil.scala | 3 ++- 2 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/672f4676/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e06b06e..7e9dba4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext} +import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID} import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.annotation.DeveloperApi @@ -195,6 +197,18 @@ class SparkHadoopUtil extends Logging { } /** + * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly + * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes + * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+ + * while it's interface in Hadoop 2.+. + */ + def getTaskAttemptIDFromTaskAttemptContext( + context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = { + val method = context.getClass.getMethod("getTaskAttemptID") + method.invoke(context).asInstanceOf[MapReduceTaskAttemptID] + } + + /** * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the * given path points to a file, return a single-element collection containing [[FileStatus]] of * that file. http://git-wip-us.apache.org/repos/asf/spark/blob/672f4676/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 87df427..f405b73 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext} import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.CommitDeniedException import org.apache.spark.{Logging, SparkEnv, TaskContext} import org.apache.spark.util.{Utils => SparkUtils} @@ -93,7 +94,7 @@ object SparkHadoopMapRedUtil extends Logging { splitId: Int, attemptId: Int): Unit = { - val mrTaskAttemptID = mrTaskContext.getTaskAttemptID + val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext) // Called after we have decided to commit def performCommit(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
