Repository: spark Updated Branches: refs/heads/master 30345c43b -> f5d18af6a
[SPARK-14958][CORE] Failed task not handled when there's error deserializing failure reason ## What changes were proposed in this pull request? TaskResultGetter tries to deserialize the TaskEndReason before handling the failed task. If an error is thrown during deserialization, the failed task won't be handled, which leaves the job hanging. The PR proposes to handle the failed task in a finally block. ## How was this patch tested? In my case I hit a NoClassDefFoundError and the job hangs. Manually verified the patch can fix it. Author: Rui Li <[email protected]> Author: Rui Li <[email protected]> Author: Rui Li <[email protected]> Closes #12775 from lirui-intel/SPARK-14958. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5d18af6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5d18af6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5d18af6 Branch: refs/heads/master Commit: f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6 Parents: 30345c4 Author: Rui Li <[email protected]> Authored: Thu Jan 5 14:51:13 2017 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Thu Jan 5 14:51:13 2017 -0800 ---------------------------------------------------------------------- .../spark/scheduler/TaskResultGetter.scala | 6 +++++- .../spark/scheduler/TaskResultGetterSuite.scala | 21 +++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f5d18af6/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index b1addc1..a284f79 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -143,8 +143,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) case ex: Exception => // No-op + } finally { + // If there's an error while deserializing the TaskEndReason, this Runnable + // will die. Still tell the scheduler about the task failure, to avoid a hang + // where the scheduler thinks the task is still running. + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } - scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } }) } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/f5d18af6/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index c9e682f..3e55d39 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.File +import java.io.{File, ObjectInputStream} import java.net.URL import java.nio.ByteBuffer @@ -248,5 +248,24 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(resSizeAfter.exists(_.toString.toLong > 0L)) } + test("failed task is handled when error occurs deserializing the reason") { + sc = new SparkContext("local", "test", conf) + val rdd = sc.parallelize(Seq(1), 1).map { _ => + throw new UndeserializableException + } + val message = intercept[SparkException] { + rdd.collect() + }.getMessage + // Job failed, even though the failure reason is unknown. + val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r + assert(unknownFailure.findFirstMatchIn(message).isDefined) + } + +} + +private class UndeserializableException extends Exception { + private def readObject(in: ObjectInputStream): Unit = { + throw new NoClassDefFoundError() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
