This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 20051eb [SPARK-36540][YARN] YARN-CLIENT mode should check Shutdown
message when AMEndpoint disconencted
20051eb is described below
commit 20051eb69904de6afc27fe5adb18bcc760c78701
Author: Angerszhuuuu <[email protected]>
AuthorDate: Mon Oct 11 08:24:49 2021 -0500
[SPARK-36540][YARN] YARN-CLIENT mode should check Shutdown message when
AMEndpoint disconencted
### What changes were proposed in this pull request?
We meet a case AM lose connection
```
21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result
RpcResponse{requestId=5675952834716124039,
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to
xx.xx.xx.xx:41420; closing connection
java.nio.channels.ClosedChannelException
at
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
Check the code about client, when AMEndpoint disconnected, will finish
Application with SUCCESS final status
```
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
// In cluster mode or unmanaged am case, do not rely on the
disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the
driver fails
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
logInfo(s"Driver terminated or disconnected! Shutting down.
$remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED,
ApplicationMaster.EXIT_SUCCESS)
}
}
```
Normally say in client mode, when application success, driver will stop and
AM loss connection, it's ok that exit with SUCCESS, but if there is a not work
problem cause disconnected. Still finish with final status is not correct.
Then YarnClientSchedulerBackend will receive application report with final
status with success and stop SparkContext cause application failed but mark it
as a normal stop.
```
private class MonitorThread extends Thread {
private var allowInterrupt = true
override def run() {
try {
val YarnAppReport(_, state, diags) =
client.monitorApplication(appId.get, logApplicationReport = false)
logError(s"YARN application has exited unexpectedly with state
$state! " +
"Check the YARN application logs for more details.")
diags.foreach { err =>
logError(s"Diagnostics message: $err")
}
allowInterrupt = false
sc.stop()
} catch {
case e: InterruptedException => logInfo("Interrupting monitor
thread")
}
}
def stopMonitor(): Unit = {
if (allowInterrupt) {
this.interrupt()
}
```
IMO, we should send a `Shutdown` message to yarn client mode AM to make
sure the shut down case
### Why are the changes needed?
Fix bug
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes #33780 from AngersZhuuuu/SPARK-36540.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
docs/running-on-yarn.md | 13 +++++++++++++
.../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 15 +++++++++++++--
.../main/scala/org/apache/spark/deploy/yarn/config.scala | 15 +++++++++++++++
.../scheduler/cluster/YarnClientSchedulerBackend.scala | 1 +
.../spark/scheduler/cluster/YarnSchedulerBackend.scala | 13 +++++++++++--
5 files changed, 53 insertions(+), 4 deletions(-)
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 37ff479..8b7ed18 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -442,6 +442,19 @@ To use a custom metrics.properties for the application
master and executors, upd
<td>1.6.0</td>
</tr>
<tr>
+ <td><code>spark.yarn.am.clientModeTreatDisconnectAsFailed</code></td>
+ <td>false</td>
+ <td>
+ Treat yarn-client unclean disconnects as failures. In yarn-client mode,
normally the application will always finish
+ with a final status of SUCCESS because in some cases, it is not possible to
know if the Application was terminated
+ intentionally by the user or if there was a real error. This config changes
that behavior such that if the Application
+ Master disconnects from the driver uncleanly (ie without the proper shutdown
handshake) the application will
+ terminate with a final status of FAILED. This will allow the caller to
decide if it was truly a failure. Note that if
+ this config is set and the user just terminate the client application badly
it may show a status of FAILED when it wasn't really FAILED.
+ </td>
+ <td>3.3.0</td>
+</tr>
+<tr>
<td><code>spark.yarn.am.clientModeExitOnError</code></td>
<td>false</td>
<td>
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1c62d0a..a25edb0 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -784,6 +784,9 @@ private[spark] class ApplicationMaster(
*/
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
extends RpcEndpoint with Logging {
+ @volatile private var shutdown = false
+ private val clientModeTreatDisconnectAsFailed =
+ sparkConf.get(AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED)
override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
@@ -801,6 +804,8 @@ private[spark] class ApplicationMaster(
override def receive: PartialFunction[Any, Unit] = {
case UpdateDelegationTokens(tokens) =>
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
+
+ case Shutdown => shutdown = true
}
override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
@@ -843,8 +848,13 @@ private[spark] class ApplicationMaster(
// In cluster mode or unmanaged am case, do not rely on the
disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver
fails
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
- logInfo(s"Driver terminated or disconnected! Shutting down.
$remoteAddress")
- finish(FinalApplicationStatus.SUCCEEDED,
ApplicationMaster.EXIT_SUCCESS)
+ if (shutdown || !clientModeTreatDisconnectAsFailed) {
+ logInfo(s"Driver terminated or disconnected! Shutting down.
$remoteAddress")
+ finish(FinalApplicationStatus.SUCCEEDED,
ApplicationMaster.EXIT_SUCCESS)
+ } else {
+ logError(s"Application Master lost connection with driver! Shutting
down. $remoteAddress")
+ finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_DISCONNECTED)
+ }
}
}
}
@@ -862,6 +872,7 @@ object ApplicationMaster extends Logging {
private val EXIT_SECURITY = 14
private val EXIT_EXCEPTION_USER_CLASS = 15
private val EXIT_EARLY = 16
+ private val EXIT_DISCONNECTED = 17
private var master: ApplicationMaster = _
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index ab2063c..1270f1e 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -52,6 +52,21 @@ package object config extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED =
+ ConfigBuilder("spark.yarn.am.clientModeTreatDisconnectAsFailed")
+ .doc("Treat yarn-client unclean disconnects as failures. In yarn-client
mode, normally the " +
+ "application will always finish with a final status of SUCCESS because
in some cases, " +
+ "it is not possible to know if the Application was terminated
intentionally by the user " +
+ "or if there was a real error. This config changes that behavior such
that " +
+ "if the Application Master disconnects from the driver uncleanly (ie
without the proper" +
+ " shutdown handshake) the application will terminate with a final
status of FAILED. " +
+ "This will allow the caller to decide if it was truly a failure. Note
that " +
+ "if this config is set and the user just terminate the client
application badly " +
+ "it may show a status of FAILED when it wasn't really FAILED.")
+ .version("3.3.0")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val AM_CLIENT_MODE_EXIT_ON_ERROR =
ConfigBuilder("spark.yarn.am.clientModeExitOnError")
.doc("In yarn-client mode, when this is true, if driver got " +
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 28c8652..1b70e40 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -161,6 +161,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
override def stop(): Unit = {
assert(client != null, "Attempted to stop this scheduler before starting
it!")
+ yarnSchedulerEndpoint.handleClientModeDriverStop()
if (monitorThread != null) {
monitorThread.stopMonitor()
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index c5c4594..c3aea37 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -58,7 +58,7 @@ private[spark] abstract class YarnSchedulerBackend(
protected var totalExpectedExecutors = 0
- private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+ protected val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
protected var amEndpoint: Option[RpcEndpointRef] = None
private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
@@ -291,7 +291,7 @@ private[spark] abstract class YarnSchedulerBackend(
/**
* An [[RpcEndpoint]] that communicates with the ApplicationMaster.
*/
- private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
+ protected class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
@@ -319,6 +319,15 @@ private[spark] abstract class YarnSchedulerBackend(
removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
}
+ private[cluster] def handleClientModeDriverStop(): Unit = {
+ amEndpoint match {
+ case Some(am) =>
+ am.send(Shutdown)
+ case None =>
+ logWarning("Attempted to send shutdown message before the AM has
registered!")
+ }
+ }
+
override def receive: PartialFunction[Any, Unit] = {
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]