This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 932ff58 [ZEPPELIN-4669]. spark staging dir is not deleted in yarn cluster mode 932ff58 is described below commit 932ff58f5e4811463d64c997de9bff252a0ab4a4 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Mar 6 12:52:07 2020 +0800 [ZEPPELIN-4669]. spark staging dir is not deleted in yarn cluster mode ### What is this PR for? This PR will delete the staging directory in yarn-cluster mode before stop the SparkContext. ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4669 ### How should this be tested? * Manually tested, run spark interpreter in yarn cluster mode and verify the staging folder is deleted after close the spark interpreter. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Jeff Zhang <zjf...@apache.org> Closes #3680 from zjffdu/ZEPPELIN-4669 and squashes the following commits: 075479a6c [Jeff Zhang] [ZEPPELIN-4669]. spark staging dir is not deleted in yarn cluster mode --- spark/spark-scala-parent/pom.xml | 2 +- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 28 +++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml index 509af0b..7b2a589 100644 --- a/spark/spark-scala-parent/pom.xml +++ b/spark/spark-scala-parent/pom.xml @@ -34,7 +34,7 @@ <name>Zeppelin: Spark Scala Parent</name> <properties> - <spark.version>2.4.0</spark.version> + <spark.version>2.4.4</spark.version> <spark.scala.binary.version>2.11</spark.scala.binary.version> <spark.scala.version>2.11.12</spark.scala.version> <saprk.scala.compile.version>${spark.scala.binary.version}</saprk.scala.compile.version> diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 2d6a1da..994c7ca 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -18,7 +18,7 @@ package org.apache.zeppelin.spark -import java.io.File +import java.io.{File, IOException} import java.net.{URL, URLClassLoader} import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger @@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.zeppelin.interpreter.util.InterpreterOutputStream @@ -178,6 +180,18 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, bind(name, tpe, value, modifier.asScala.toList) protected def close(): Unit = { + // delete stagingDir for yarn mode + if (conf.get("spark.master").startsWith("yarn")) { + val hadoopConf = new YarnConfiguration() + val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) { + new Path(conf.get("spark.yarn.stagingDir")) + } else { + FileSystem.get(hadoopConf).getHomeDirectory() + } + val stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId) + cleanupStagingDirInternal(stagingDirPath, hadoopConf) + } + if (sparkHttpServer != null) { sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) } @@ -192,6 +206,18 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, sqlContext = null } + private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = { + try { + val fs = stagingDirPath.getFileSystem(hadoopConf) + if (fs.delete(stagingDirPath, true)) { + LOGGER.info(s"Deleted staging directory $stagingDirPath") + } + } catch { + case ioe: IOException => + LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + protected def createSparkContext(): Unit = { if (isSparkSessionPresent()) { spark2CreateContext()