Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fc35fb351 -> c1bde2a92


[SPARK-11996][CORE] Make the executor thread dump work again

In the previous implementation, the driver needs to know the executor listening 
address to send the thread dump request. However, in Netty RPC, the executor 
doesn't listen to any port, so the executor thread dump feature is broken.

This patch makes the driver use the endpointRef stored in 
BlockManagerMasterEndpoint to send the thread dump request to fix it.

Author: Shixiong Zhu <[email protected]>

Closes #9976 from zsxwing/executor-thread-dump.

(cherry picked from commit 0c1e72e7f79231e537299b57a1ab7cd843171923)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1bde2a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1bde2a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1bde2a9

Branch: refs/heads/branch-1.6
Commit: c1bde2a92ebb4768dc6ede8ebc8dfaacc571bf44
Parents: fc35fb3
Author: Shixiong Zhu <[email protected]>
Authored: Thu Nov 26 18:56:22 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Thu Nov 26 18:56:41 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 10 ++---
 .../org/apache/spark/executor/Executor.scala    |  5 ---
 .../spark/executor/ExecutorEndpoint.scala       | 43 --------------------
 .../spark/storage/BlockManagerMaster.scala      |  4 +-
 .../storage/BlockManagerMasterEndpoint.scala    | 12 +++---
 .../spark/storage/BlockManagerMessages.scala    |  7 +++-
 .../storage/BlockManagerSlaveEndpoint.scala     |  7 +++-
 project/MimaExcludes.scala                      |  8 ++++
 8 files changed, 29 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2c10779..b030d3c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,20 +48,20 @@ import org.apache.mesos.MesosNativeLibrary
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
 import org.apache.spark.input.{StreamInputFormat, PortableDataStream, 
WholeTextFileInputFormat,
   FixedLengthBinaryInputFormat}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
+import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
   SparkDeploySchedulerBackend, SimrSchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, 
MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage._
+import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
 import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
 import org.apache.spark.ui.jobs.JobProgressListener
 import org.apache.spark.util._
@@ -619,11 +619,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       if (executorId == SparkContext.DRIVER_IDENTIFIER) {
         Some(Utils.getThreadDump())
       } else {
-        val (host, port) = 
env.blockManager.master.getRpcHostPortForExecutor(executorId).get
-        val endpointRef = env.rpcEnv.setupEndpointRef(
-          SparkEnv.executorActorSystemName,
-          RpcAddress(host, port),
-          ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
+        val endpointRef = 
env.blockManager.master.getExecutorEndpointRef(executorId).get
         
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9e88d48..6154f06 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -85,10 +85,6 @@ private[spark] class Executor(
     env.blockManager.initialize(conf.getAppId)
   }
 
-  // Create an RpcEndpoint for receiving RPCs from the driver
-  private val executorEndpoint = env.rpcEnv.setupEndpoint(
-    ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, 
executorId))
-
   // Whether to load classes in user jars before those in Spark jars
   private val userClassPathFirst = 
conf.getBoolean("spark.executor.userClassPathFirst", false)
 
@@ -136,7 +132,6 @@ private[spark] class Executor(
 
   def stop(): Unit = {
     env.metricsSystem.report()
-    env.rpcEnv.stop(executorEndpoint)
     heartbeater.shutdown()
     heartbeater.awaitTermination(10, TimeUnit.SECONDS)
     threadPool.shutdown()

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala
deleted file mode 100644
index cf362f8..0000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
-import org.apache.spark.util.Utils
-
-/**
- * Driver -> Executor message to trigger a thread dump.
- */
-private[spark] case object TriggerThreadDump
-
-/**
- * [[RpcEndpoint]] that runs inside of executors to enable driver -> executor 
RPC.
- */
-private[spark]
-class ExecutorEndpoint(override val rpcEnv: RpcEnv, executorId: String) 
extends RpcEndpoint {
-
-  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case TriggerThreadDump =>
-      context.reply(Utils.getThreadDump())
-  }
-
-}
-
-object ExecutorEndpoint {
-  val EXECUTOR_ENDPOINT_NAME = "ExecutorEndpoint"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index f45bff3..440c4c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -87,8 +87,8 @@ class BlockManagerMaster(
     driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
-  def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
-    driverEndpoint.askWithRetry[Option[(String, 
Int)]](GetRpcHostPortForExecutor(executorId))
+  def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
+    
driverEndpoint.askWithRetry[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 7db6035..41892b4 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -19,7 +19,6 @@ package org.apache.spark.storage
 
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.immutable.HashSet
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
@@ -75,8 +74,8 @@ class BlockManagerMasterEndpoint(
     case GetPeers(blockManagerId) =>
       context.reply(getPeers(blockManagerId))
 
-    case GetRpcHostPortForExecutor(executorId) =>
-      context.reply(getRpcHostPortForExecutor(executorId))
+    case GetExecutorEndpointRef(executorId) =>
+      context.reply(getExecutorEndpointRef(executorId))
 
     case GetMemoryStatus =>
       context.reply(memoryStatus)
@@ -388,15 +387,14 @@ class BlockManagerMasterEndpoint(
   }
 
   /**
-   * Returns the hostname and port of an executor, based on the [[RpcEnv]] 
address of its
-   * [[BlockManagerSlaveEndpoint]].
+   * Returns an [[RpcEndpointRef]] of the [[BlockManagerSlaveEndpoint]] for 
sending RPC messages.
    */
-  private def getRpcHostPortForExecutor(executorId: String): Option[(String, 
Int)] = {
+  private def getExecutorEndpointRef(executorId: String): 
Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
       info <- blockManagerInfo.get(blockManagerId)
     ) yield {
-      (info.slaveEndpoint.address.host, info.slaveEndpoint.address.port)
+      info.slaveEndpoint
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 376e9eb..f392a4a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -42,6 +42,11 @@ private[spark] object BlockManagerMessages {
   case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = 
true)
     extends ToBlockManagerSlave
 
+  /**
+   * Driver -> Executor message to trigger a thread dump.
+   */
+  case object TriggerThreadDump extends ToBlockManagerSlave
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Messages from slaves to the master.
   
//////////////////////////////////////////////////////////////////////////////////
@@ -90,7 +95,7 @@ private[spark] object BlockManagerMessages {
 
   case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
 
-  case class GetRpcHostPortForExecutor(executorId: String) extends 
ToBlockManagerMaster
+  case class GetExecutorEndpointRef(executorId: String) extends 
ToBlockManagerMaster
 
   case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index e749631..9eca902 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -19,10 +19,10 @@ package org.apache.spark.storage
 
 import scala.concurrent.{ExecutionContext, Future}
 
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext, 
RpcEndpoint}
-import org.apache.spark.util.ThreadUtils
 import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * An RpcEndpoint to take commands from the master to execute options. For 
example,
@@ -70,6 +70,9 @@ class BlockManagerSlaveEndpoint(
 
     case GetMatchingBlockIds(filter, _) =>
       context.reply(blockManager.getMatchingBlockIds(filter))
+
+    case TriggerThreadDump =>
+      context.reply(Utils.getThreadDump())
   }
 
   private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: 
=> T) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bde2a9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 54a9ad9..566bfe8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -147,6 +147,14 @@ object MimaExcludes {
         // SPARK-4557 Changed foreachRDD to use VoidFunction
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD")
+      ) ++ Seq(
+        // SPARK-11996 Make the executor thread dump work again
+        
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.ExecutorEndpoint"),
+        
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.ExecutorEndpoint$"),
+        ProblemFilters.exclude[MissingClassProblem](
+          
"org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor"),
+        ProblemFilters.exclude[MissingClassProblem](
+          
"org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor$")
       )
     case v if v.startsWith("1.5") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to