This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 932ff58  [ZEPPELIN-4669]. spark staging dir is not deleted in yarn 
cluster mode
932ff58 is described below

commit 932ff58f5e4811463d64c997de9bff252a0ab4a4
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Fri Mar 6 12:52:07 2020 +0800

    [ZEPPELIN-4669]. spark staging dir is not deleted in yarn cluster mode
    
    ### What is this PR for?
    
    This PR will delete the staging directory in yarn-cluster mode before stop 
the SparkContext.
    
    ### What type of PR is it?
    [ Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4669
    
    ### How should this be tested?
    * Manually tested, run spark interpreter in yarn cluster mode and verify 
the staging folder is deleted after close the spark interpreter.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3680 from zjffdu/ZEPPELIN-4669 and squashes the following commits:
    
    075479a6c [Jeff Zhang] [ZEPPELIN-4669]. spark staging dir is not deleted in 
yarn cluster mode
---
 spark/spark-scala-parent/pom.xml                   |  2 +-
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 28 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml
index 509af0b..7b2a589 100644
--- a/spark/spark-scala-parent/pom.xml
+++ b/spark/spark-scala-parent/pom.xml
@@ -34,7 +34,7 @@
     <name>Zeppelin: Spark Scala Parent</name>
 
     <properties>
-        <spark.version>2.4.0</spark.version>
+        <spark.version>2.4.4</spark.version>
         <spark.scala.binary.version>2.11</spark.scala.binary.version>
         <spark.scala.version>2.11.12</spark.scala.version>
         
<saprk.scala.compile.version>${spark.scala.binary.version}</saprk.scala.compile.version>
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 2d6a1da..994c7ca 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -18,7 +18,7 @@
 package org.apache.zeppelin.spark
 
 
-import java.io.File
+import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.nio.file.Paths
 import java.util.concurrent.atomic.AtomicInteger
@@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
@@ -178,6 +180,18 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
     bind(name, tpe, value, modifier.asScala.toList)
 
   protected def close(): Unit = {
+    // delete stagingDir for yarn mode
+    if (conf.get("spark.master").startsWith("yarn")) {
+      val hadoopConf = new YarnConfiguration()
+      val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) {
+        new Path(conf.get("spark.yarn.stagingDir"))
+      } else {
+        FileSystem.get(hadoopConf).getHomeDirectory()
+      }
+      val stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + 
sc.applicationId)
+      cleanupStagingDirInternal(stagingDirPath, hadoopConf)
+    }
+
     if (sparkHttpServer != null) {
       sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
     }
@@ -192,6 +206,18 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
     sqlContext = null
   }
 
+  private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: 
Configuration): Unit = {
+    try {
+      val fs = stagingDirPath.getFileSystem(hadoopConf)
+      if (fs.delete(stagingDirPath, true)) {
+        LOGGER.info(s"Deleted staging directory $stagingDirPath")
+      }
+    } catch {
+      case ioe: IOException =>
+        LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
   protected def createSparkContext(): Unit = {
     if (isSparkSessionPresent()) {
       spark2CreateContext()

Reply via email to