Repository: spark Updated Branches: refs/heads/branch-2.0 f441b9ac6 -> 8aa419b23
[SPARK-18280][CORE] Fix potential deadlock in `StandaloneSchedulerBackend.dead` ## What changes were proposed in this pull request? "StandaloneSchedulerBackend.dead" is called in a RPC thread, so it should not call "SparkContext.stop" in the same thread. "SparkContext.stop" will block until all RPC threads exit, if it's called inside a RPC thread, it will be dead-lock. This PR add a thread local flag inside RPC threads. `SparkContext.stop` uses it to decide if launching a new thread to stop the SparkContext. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15775 from zsxwing/SPARK-18280. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa419b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa419b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa419b2 Branch: refs/heads/branch-2.0 Commit: 8aa419b23c24d151e438503e6c7c8503830b1b19 Parents: f441b9a Author: Shixiong Zhu <[email protected]> Authored: Tue Nov 8 13:14:56 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Tue Nov 8 13:16:00 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 22 ++++++++++++++++++-- .../scala/org/apache/spark/rpc/RpcEnv.scala | 4 ++++ .../org/apache/spark/rpc/netty/Dispatcher.scala | 1 + .../apache/spark/rpc/netty/NettyRpcEnv.scala | 3 +++ .../org/apache/spark/rpc/RpcEnvSuite.scala | 13 ++++++++++++ 5 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8aa419b2/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 43cec70..82e754b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1734,8 +1734,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def listJars(): Seq[String] = addedJars.keySet.toSeq - // Shut down the SparkContext. - def stop() { + /** + * Shut down the SparkContext. + */ + def stop(): Unit = { + if (env.rpcEnv.isInRPCThread) { + // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread. + // We should launch a new thread to call `stop` to avoid dead-lock. + new Thread("stop-spark-context") { + setDaemon(true) + + override def run(): Unit = { + _stop() + } + }.start() + } else { + _stop() + } + } + + private def _stop() { if (LiveListenerBus.withinListenerThread.value) { throw new SparkException( s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") http://git-wip-us.apache.org/repos/asf/spark/blob/8aa419b2/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 5668377..38af03c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -135,6 +135,10 @@ private[spark] abstract class RpcEnv(conf: SparkConf) { */ def openChannel(uri: String): ReadableByteChannel + /** + * Return if the current thread is a RPC thread. + */ + def isInRPCThread: Boolean } /** http://git-wip-us.apache.org/repos/asf/spark/blob/8aa419b2/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index a02cf30..67baabd 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -201,6 +201,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { /** Message loop used for dispatching messages. */ private class MessageLoop extends Runnable { override def run(): Unit = { + NettyRpcEnv.rpcThreadFlag.value = true try { while (true) { try { http://git-wip-us.apache.org/repos/asf/spark/blob/8aa419b2/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 89d2fb9..f8ca4d4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -408,10 +408,13 @@ private[netty] class NettyRpcEnv( } + override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value } private[netty] object NettyRpcEnv extends Logging { + private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false) + /** * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]]. * Use `currentEnv` to wrap the deserialization codes. E.g., http://git-wip-us.apache.org/repos/asf/spark/blob/8aa419b2/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index acdf21d..aa07059 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -870,6 +870,19 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { verify(endpoint, never()).onDisconnected(any()) verify(endpoint, never()).onNetworkError(any(), any()) } + + test("isInRPCThread") { + val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(rpcEnv.isInRPCThread) + } + }) + assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true) + assert(env.isInRPCThread === false) + env.stop(rpcEndpointRef) + } } class UnserializableClass --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
