This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3c34e45 [SPARK-31029] Avoid using global execution context in driver
main thread for YarnSchedulerBackend
3c34e45 is described below
commit 3c34e45df4b3e0610dde5334716025a85cbbc05b
Author: Shanyu Zhao <[email protected]>
AuthorDate: Fri Jun 19 09:59:14 2020 -0500
[SPARK-31029] Avoid using global execution context in driver main thread
for YarnSchedulerBackend
#31029 # What changes were proposed in this pull request?
In YarnSchedulerBackend, we should avoid using the global execution context
for its Future. Otherwise if user's Spark application also uses global
execution context for its Future, the user is facing indeterministic behavior
in terms of the thread's context class loader.
### Why are the changes needed?
When running tpc-ds test (https://github.com/databricks/spark-sql-perf),
occasionally we see error related to class not found:
2020-02-04 20:00:26,673 ERROR yarn.ApplicationMaster: User class threw
exception: scala.ScalaReflectionException: class
com.databricks.spark.sql.perf.ExperimentRun in JavaMirror with
sun.misc.Launcher$AppClassLoader28ba21f3 of type class
sun.misc.Launcher$AppClassLoader with classpath [...]
and parent being sun.misc.Launcher$ExtClassLoader3ff5d147 of type class
sun.misc.Launcher$ExtClassLoader with classpath [...]
and parent being primordial classloader with boot classpath [...] not found.
This is the root cause for the problem:
Spark driver starts ApplicationMaster in the main thread, which starts a
user thread and set MutableURLClassLoader to that thread's ContextClassLoader.
userClassThread = startUserApplication()
The main thread then setup YarnSchedulerBackend RPC endpoints, which
handles these calls using scala Future with the default global ExecutionContext:
doRequestTotalExecutors
doKillExecutors
So for the main thread and user thread, whoever starts the future first get
a chance to set ContextClassLoader to the default thread pool:
- If main thread starts a future to handle doKillExecutors() before user
thread does then the default thread pool thread's ContextClassLoader would be
the default (AppClassLoader).
- If user thread starts a future first then the thread pool thread will
have MutableURLClassLoader.
Note that only MutableURLClassLoader can load user provided class for the
Spark app, you will see errors related to class not found if the
ContextClassLoader is AppClassLoader.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing unit tests and manual tests
Closes #27843 from shanyu/shanyu-31029.
Authored-by: Shanyu Zhao <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
.../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e428bab..0475b0a 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -21,8 +21,7 @@ import java.util.EnumSet
import java.util.concurrent.atomic.{AtomicBoolean}
import javax.servlet.DispatcherType
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
@@ -67,6 +66,14 @@ private[spark] abstract class YarnSchedulerBackend(
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
+ /**
+ * Declare implicit single thread execution context for futures
doRequestTotalExecutors and
+ * doKillExecutors below, avoiding using the global execution context that
may cause conflict
+ * with user code's execution of futures.
+ */
+ private implicit val schedulerEndpointEC =
ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonSingleThreadExecutor("yarn-scheduler-endpoint"))
+
/** Application ID. */
protected var appId: Option[ApplicationId] = None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]