This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 8c6f60b7 [LIVY-896] Livy not capture spark-submit error exit if timing
is right (#358)
8c6f60b7 is described below
commit 8c6f60b74633f25c211ca248ba290de65aee89f9
Author: Jeff Xu <[email protected]>
AuthorDate: Tue Nov 8 01:39:15 2022 -0800
[LIVY-896] Livy not capture spark-submit error exit if timing is right
(#358)
---
.../scala/org/apache/livy/utils/SparkYarnApp.scala | 67 ++++++++++-------
.../org/apache/livy/utils/SparkYarnAppSpec.scala | 86 ++++++++++++++++++++++
2 files changed, 127 insertions(+), 26 deletions(-)
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index a245823e..dd551c20 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -164,6 +164,10 @@ class SparkYarnApp private[utils] (
}
}
+ private def isProcessAlive(): Boolean = {
+ process.isDefined && process.get.isAlive
+ }
+
private def isProcessErrExit(): Boolean = {
process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
}
@@ -282,34 +286,45 @@ class SparkYarnApp private[utils] (
try {
Clock.sleep(pollInterval.toMillis)
- // Refresh application state
- val appReport = yarnClient.getApplicationReport(appId)
- yarnDiagnostics = getYarnDiagnostics(appReport)
- changeState(mapYarnState(
- appReport.getApplicationId,
- appReport.getYarnApplicationState,
- appReport.getFinalApplicationStatus))
-
- if (isProcessErrExit()) {
- if (killed) {
- changeState(SparkApp.State.KILLED)
- } else {
- changeState(SparkApp.State.FAILED)
+ if (!isProcessAlive()) {
+ // Refresh application state
+ val appReport = yarnClient.getApplicationReport(appId)
+ yarnDiagnostics = getYarnDiagnostics(appReport)
+
+ // figure out the application's actual state and update in a
single operation
+ val sessState =
+ if (isProcessErrExit()) {
+ if (killed) {
+ debug(s"sess state: process killed")
+ SparkApp.State.KILLED
+ } else {
+ debug(s"sess state: process err exited")
+ SparkApp.State.FAILED
+ }
+ }
+ else {
+ val yarnState = mapYarnState(
+ appReport.getApplicationId,
+ appReport.getYarnApplicationState,
+ appReport.getFinalApplicationStatus)
+ debug(s"sess state: yarn=${yarnState}")
+ yarnState
+ }
+ changeState(sessState)
+
+ val latestAppInfo = {
+ val attempt =
+
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
+ val driverLogUrl =
+
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
+ .toOption
+ AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
}
- }
-
- val latestAppInfo = {
- val attempt =
-
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
- val driverLogUrl =
-
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
- .toOption
- AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
- }
- if (appInfo != latestAppInfo) {
- listener.foreach(_.infoChanged(latestAppInfo))
- appInfo = latestAppInfo
+ if (appInfo != latestAppInfo) {
+ listener.foreach(_.infoChanged(latestAppInfo))
+ appInfo = latestAppInfo
+ }
}
} catch {
// This exception might be thrown during app is starting up. It's
transient.
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index ddd97674..17a78320 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -269,6 +269,92 @@ class SparkYarnAppSpec extends FunSpec with
LivyBaseUnitTestSuite {
}
}
+ // LIVY-896
+ it("should end with state failed when spark submit failed but Yarn reports
SUCCESS") {
+ Clock.withSleepMethod(mockSleep) {
+ val diag = "DIAG"
+ val mockYarnClient = mock[YarnClient]
+
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getDiagnostics).thenReturn(diag)
+
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+
when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val mockSparkSubmit = mock[LineBufferedProcess]
+ when(mockSparkSubmit.isAlive).thenReturn(false)
+ when(mockSparkSubmit.exitValue).thenReturn(-1)
+
+ val listener = new SparkAppListener {
+ var nStateChanged = 0
+ override def stateChanged(oldState: SparkApp.State, newState:
SparkApp.State): Unit = {
+ nStateChanged += 1
+ }
+
+ }
+ val app = new SparkYarnApp(
+ appTag,
+ None,
+ Some(mockSparkSubmit),
+ Some(listener),
+ livyConf,
+ mockYarnClient)
+
+ cleanupThread(app.yarnAppMonitorThread) {
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should terminate after YARN app is
finished.")
+ assert(app.state == SparkApp.State.FAILED,
+ "SparkYarnApp should end with state failed when spark submit
failed")
+ assert(listener.nStateChanged == 1,
+ "SparkYarnApp should make state change only once.")
+ }
+ }
+ }
+
+ // LIVY-896
+ it("should never change state if spark submit not finished") {
+ Clock.withSleepMethod(mockSleep) {
+ val diag = "DIAG"
+ val mockYarnClient = mock[YarnClient]
+
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getDiagnostics).thenReturn(diag)
+
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+
when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val mockSparkSubmit = mock[LineBufferedProcess]
+ when(mockSparkSubmit.isAlive).thenReturn(true)
+
+ val listener = new SparkAppListener {
+ var nStateChanged = 0
+ override def stateChanged(oldState: SparkApp.State, newState:
SparkApp.State): Unit = {
+ nStateChanged += 1
+ }
+
+ }
+ val app = new SparkYarnApp(
+ appTag,
+ None,
+ Some(mockSparkSubmit),
+ Some(listener),
+ livyConf,
+ mockYarnClient)
+
+ cleanupThread(app.yarnAppMonitorThread) {
+ // 2 seconds is sufficient - yarn poll interval should be 200ms
+ app.yarnAppMonitorThread.join((2 seconds).toMillis)
+ assert(app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should not terminate if spark-submit is
still alive.")
+ assert(listener.nStateChanged == 0,
+ "SparkYarnApp should not make state change.")
+ }
+ }
+ }
+
it("should map YARN state to SparkApp.State correctly") {
val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf)
cleanupThread(app.yarnAppMonitorThread) {