This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e945a9031d4e [SPARK-50783] Canonicalize JVM profiler results file name 
and layout on DFS
e945a9031d4e is described below

commit e945a9031d4eb4f9893f9b1a744646c2cfb214c0
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 13 19:21:34 2025 -0800

    [SPARK-50783] Canonicalize JVM profiler results file name and layout on DFS
    
    ### What changes were proposed in this pull request?
    
    This PR canonicalizes the JVM profiler added in SPARK-46094 profiling 
result files on DFS to
    ```
    dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr
    ```
    which majorly follows the event logs file name pattern and layout.
    
    ### Why are the changes needed?
    
    According to 
https://github.com/apache/spark/pull/44021#issuecomment-1863873954, we can 
integrate the profiling results with Spark UI (both live and history) in the 
future, so it's good to follow the event logs file name pattern and layout as 
much as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, it's an unreleased feature.
    
    ### How was this patch tested?
    
    ```
    $ bin/spark-submit run-example \
      --master yarn \
      --deploy-mode cluster \
      --conf 
spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \
      --conf spark.executor.profiling.enabled=true \
      --conf spark.executor.profiling.dfsDir=hdfs:///spark-profiling \
      --conf spark.executor.profiling.fraction=1 \
      SparkPi 100000
    ```
    
    ```
    hadoopspark-dev1:~/spark$ hadoop fs -ls /spark-profiling/
    Found 1 items
    drwxrwx---   - hadoop supergroup          0 2025-01-13 10:29 
/spark-profiling/application_1736320707252_0023_1
    ```
    ```
    hadoopspark-dev1:~/spark$ hadoop fs -ls 
/spark-profiling/application_1736320707252_0023_1
    Found 48 items
    -rw-rw----   3 hadoop supergroup    5255028 2025-01-13 10:29 
/spark-profiling/application_1736320707252_0023_1/profile-exec-1.jfr
    -rw-rw----   3 hadoop supergroup    3840775 2025-01-13 10:29 
/spark-profiling/application_1736320707252_0023_1/profile-exec-10.jfr
    -rw-rw----   3 hadoop supergroup    3889002 2025-01-13 10:29 
/spark-profiling/application_1736320707252_0023_1/profile-exec-11.jfr
    -rw-rw----   3 hadoop supergroup    3570697 2025-01-13 10:29 
/spark-profiling/application_1736320707252_0023_1/profile-exec-12.jfr
    ...
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49440 from pan3793/SPARK-50783.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 connector/profiler/README.md                       |  6 +-
 .../executor/profiler/ExecutorJVMProfiler.scala    | 77 +++++++++++++---------
 2 files changed, 50 insertions(+), 33 deletions(-)

diff --git a/connector/profiler/README.md b/connector/profiler/README.md
index 1326fd55df09..4d97b15eb96a 100644
--- a/connector/profiler/README.md
+++ b/connector/profiler/README.md
@@ -16,7 +16,7 @@ The profiler writes the jfr files to the executor's working 
directory in the exe
 Code profiling is currently only supported for
 
 *   Linux (x64)
-*   Linux (arm 64)
+*   Linux (arm64)
 *   Linux (musl, x64)
 *   MacOS
 
@@ -54,7 +54,7 @@ Then enable the profiling in the configuration.
   <td><code>spark.executor.profiling.dfsDir</code></td>
   <td>(none)</td>
   <td>
-      An HDFS compatible path to which the profiler's output files are copied. 
The output files will be written as 
<i>dfsDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
+      An HDFS compatible path to which the profiler's output files are copied. 
The output files will be written as 
<i>dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr</i> <br/>
       If no <i>dfsDir</i> is specified then the files are not copied over. 
Users should ensure there is sufficient disk space available otherwise it may 
lead to corrupt jfr files.
   </td>
   <td>4.0.0</td>
@@ -72,7 +72,7 @@ Then enable the profiling in the configuration.
   <td>event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s</td>
   <td>
       Options to pass to the profiler. Detailed options are documented in the 
comments here:
-      <a 
href="https://github.com/async-profiler/async-profiler/blob/32601bccd9e49adda9510a2ed79d142ac6ef0ff9/src/arguments.cpp#L52";>Profiler
 arguments</a>.  
+      <a 
href="https://github.com/async-profiler/async-profiler/blob/v3.0/src/arguments.cpp#L44";>Profiler
 arguments</a>.  
        Note that the options to start, stop, specify output format, and output 
file do not have to be specified.
   </td>
   <td>4.0.0</td>
diff --git 
a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
 
b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
index 20b6db5221fa..94e5b46c6588 100644
--- 
a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
+++ 
b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
@@ -17,17 +17,17 @@
 package org.apache.spark.executor.profiler
 
 import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
-import java.net.URI
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
 import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
 import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.PATH
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 
 /**
@@ -38,15 +38,26 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, 
executorId: String) ex
   private var running = false
   private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
   private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
-  private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
+  private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR)
   private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
   private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)
 
-  private val startcmd = 
s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
-  private val stopcmd = 
s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
-  private val dumpcmd = 
s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr"
-  private val resumecmd = 
s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+  private val appId = try {
+    conf.getAppId
+  } catch {
+    case _: NoSuchElementException => "local-" + System.currentTimeMillis
+  }
+  private val appAttemptId = conf.getOption("spark.app.attempt.id")
+  private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId)
+  private val profileFile = s"profile-exec-$executorId.jfr"
+
+  private val startcmd = 
s"start,$profilerOptions,file=$profilerLocalDir/$profileFile"
+  private val stopcmd = 
s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile"
+  private val dumpcmd = 
s"dump,$profilerOptions,file=$profilerLocalDir/$profileFile"
+  private val resumecmd = 
s"resume,$profilerOptions,file=$profilerLocalDir/$profileFile"
 
+  private val PROFILER_FOLDER_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+  private val PROFILER_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("660", 8).toShort)
   private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
   private var outputStream: FSDataOutputStream = _
   private var inputStream: InputStream = _
@@ -89,28 +100,34 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, 
executorId: String) ex
     }
   }
 
+  private def requireProfilerBaseDirAsDirectory(fs: FileSystem, 
profilerDfsDir: String): Unit = {
+    if (!fs.getFileStatus(new Path(profilerDfsDir)).isDirectory) {
+      throw new IllegalArgumentException(
+        s"Profiler DFS base directory $profilerDfsDir is not a directory.")
+    }
+  }
+
   private def startWriting(): Unit = {
-    if (profilerDfsDir.isDefined) {
-      val applicationId = try {
-        conf.getAppId
-      } catch {
-        case _: NoSuchElementException => "local-" + System.currentTimeMillis
+    profilerDfsDirOpt.foreach { profilerDfsDir =>
+      val profilerDirForApp = s"$profilerDfsDir/$baseName"
+      val profileOutputFile = s"$profilerDirForApp/$profileFile"
+
+      val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+      val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf)
+
+      requireProfilerBaseDirAsDirectory(fs, profilerDfsDir)
+
+      val profilerDirForAppPath = new Path(profilerDirForApp)
+      if (!fs.exists(profilerDirForAppPath)) {
+        // SPARK-30860: use the class method to avoid the umask causing 
permission issues
+        FileSystem.mkdirs(fs, profilerDirForAppPath, 
PROFILER_FOLDER_PERMISSIONS)
       }
-      val config = SparkHadoopUtil.get.newConfiguration(conf)
-      val appName = conf.get("spark.app.name").replace(" ", "-")
-      val profilerOutputDirname = profilerDfsDir.get
-
-      val profileOutputFile =
-        
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
-      val fs = FileSystem.get(new URI(profileOutputFile), config);
-      val filenamePath = new Path(profileOutputFile)
-      outputStream = fs.create(filenamePath)
+
+      outputStream = FileSystem.create(fs, new Path(profileOutputFile), 
PROFILER_FILE_PERMISSIONS)
       try {
-        if (fs.exists(filenamePath)) {
-          fs.delete(filenamePath, true)
-        }
         logInfo(log"Copying executor profiling file to ${MDC(PATH, 
profileOutputFile)}")
-        inputStream = new BufferedInputStream(new 
FileInputStream(s"$profilerLocalDir/profile.jfr"))
+        inputStream = new BufferedInputStream(
+          new FileInputStream(s"$profilerLocalDir/$profileFile"))
         threadpool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
         threadpool.scheduleWithFixedDelay(
           new Runnable() {
@@ -158,14 +175,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, 
executorId: String) ex
     } catch {
       case e: IOException => logError("Exception occurred while writing some 
profiler output: ", e)
       case e @ (_: IllegalArgumentException | _: IllegalStateException) =>
-        logError("Some profiler output not written." +
-          " Exception occurred in profiler native code: ", e)
+        logError("Some profiler output not written. " +
+          "Exception occurred in profiler native code: ", e)
       case e: Exception => logError("Some profiler output not written. 
Unexpected exception: ", e)
     }
   }
 
   private def finishWriting(): Unit = {
-    if (profilerDfsDir.isDefined && writing) {
+    if (profilerDfsDirOpt.isDefined && writing) {
       try {
         // shutdown background writer
         threadpool.shutdown()
@@ -177,8 +194,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, 
executorId: String) ex
       } catch {
         case _: InterruptedException => Thread.currentThread().interrupt()
         case e: IOException =>
-          logWarning("Some profiling output not written." +
-            "Exception occurred while completing profiler output", e)
+          logWarning("Some profiling output not written. " +
+            "Exception occurred while completing profiler output: ", e)
       }
       writing = false
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to