This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e945a9031d4e [SPARK-50783] Canonicalize JVM profiler results file name
and layout on DFS
e945a9031d4e is described below
commit e945a9031d4eb4f9893f9b1a744646c2cfb214c0
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 13 19:21:34 2025 -0800
[SPARK-50783] Canonicalize JVM profiler results file name and layout on DFS
### What changes were proposed in this pull request?
This PR canonicalizes the JVM profiler added in SPARK-46094 profiling
result files on DFS to
```
dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr
```
which majorly follows the event logs file name pattern and layout.
### Why are the changes needed?
According to
https://github.com/apache/spark/pull/44021#issuecomment-1863873954, we can
integrate the profiling results with Spark UI (both live and history) in the
future, so it's good to follow the event logs file name pattern and layout as
much as possible.
### Does this PR introduce _any_ user-facing change?
No, it's an unreleased feature.
### How was this patch tested?
```
$ bin/spark-submit run-example \
--master yarn \
--deploy-mode cluster \
--conf
spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \
--conf spark.executor.profiling.enabled=true \
--conf spark.executor.profiling.dfsDir=hdfs:///spark-profiling \
--conf spark.executor.profiling.fraction=1 \
SparkPi 100000
```
```
hadoopspark-dev1:~/spark$ hadoop fs -ls /spark-profiling/
Found 1 items
drwxrwx--- - hadoop supergroup 0 2025-01-13 10:29
/spark-profiling/application_1736320707252_0023_1
```
```
hadoopspark-dev1:~/spark$ hadoop fs -ls
/spark-profiling/application_1736320707252_0023_1
Found 48 items
-rw-rw---- 3 hadoop supergroup 5255028 2025-01-13 10:29
/spark-profiling/application_1736320707252_0023_1/profile-exec-1.jfr
-rw-rw---- 3 hadoop supergroup 3840775 2025-01-13 10:29
/spark-profiling/application_1736320707252_0023_1/profile-exec-10.jfr
-rw-rw---- 3 hadoop supergroup 3889002 2025-01-13 10:29
/spark-profiling/application_1736320707252_0023_1/profile-exec-11.jfr
-rw-rw---- 3 hadoop supergroup 3570697 2025-01-13 10:29
/spark-profiling/application_1736320707252_0023_1/profile-exec-12.jfr
...
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49440 from pan3793/SPARK-50783.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
connector/profiler/README.md | 6 +-
.../executor/profiler/ExecutorJVMProfiler.scala | 77 +++++++++++++---------
2 files changed, 50 insertions(+), 33 deletions(-)
diff --git a/connector/profiler/README.md b/connector/profiler/README.md
index 1326fd55df09..4d97b15eb96a 100644
--- a/connector/profiler/README.md
+++ b/connector/profiler/README.md
@@ -16,7 +16,7 @@ The profiler writes the jfr files to the executor's working
directory in the exe
Code profiling is currently only supported for
* Linux (x64)
-* Linux (arm 64)
+* Linux (arm64)
* Linux (musl, x64)
* MacOS
@@ -54,7 +54,7 @@ Then enable the profiling in the configuration.
<td><code>spark.executor.profiling.dfsDir</code></td>
<td>(none)</td>
<td>
- An HDFS compatible path to which the profiler's output files are copied.
The output files will be written as
<i>dfsDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
+ An HDFS compatible path to which the profiler's output files are copied.
The output files will be written as
<i>dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr</i> <br/>
If no <i>dfsDir</i> is specified then the files are not copied over.
Users should ensure there is sufficient disk space available otherwise it may
lead to corrupt jfr files.
</td>
<td>4.0.0</td>
@@ -72,7 +72,7 @@ Then enable the profiling in the configuration.
<td>event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s</td>
<td>
Options to pass to the profiler. Detailed options are documented in the
comments here:
- <a
href="https://github.com/async-profiler/async-profiler/blob/32601bccd9e49adda9510a2ed79d142ac6ef0ff9/src/arguments.cpp#L52">Profiler
arguments</a>.
+ <a
href="https://github.com/async-profiler/async-profiler/blob/v3.0/src/arguments.cpp#L44">Profiler
arguments</a>.
Note that the options to start, stop, specify output format, and output
file do not have to be specified.
</td>
<td>4.0.0</td>
diff --git
a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
index 20b6db5221fa..94e5b46c6588 100644
---
a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
+++
b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
@@ -17,17 +17,17 @@
package org.apache.spark.executor.profiler
import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
-import java.net.URI
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.PATH
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
@@ -38,15 +38,26 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf,
executorId: String) ex
private var running = false
private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
- private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
+ private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)
- private val startcmd =
s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
- private val stopcmd =
s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
- private val dumpcmd =
s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr"
- private val resumecmd =
s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+ private val appId = try {
+ conf.getAppId
+ } catch {
+ case _: NoSuchElementException => "local-" + System.currentTimeMillis
+ }
+ private val appAttemptId = conf.getOption("spark.app.attempt.id")
+ private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId)
+ private val profileFile = s"profile-exec-$executorId.jfr"
+
+ private val startcmd =
s"start,$profilerOptions,file=$profilerLocalDir/$profileFile"
+ private val stopcmd =
s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile"
+ private val dumpcmd =
s"dump,$profilerOptions,file=$profilerLocalDir/$profileFile"
+ private val resumecmd =
s"resume,$profilerOptions,file=$profilerLocalDir/$profileFile"
+ private val PROFILER_FOLDER_PERMISSIONS = new
FsPermission(Integer.parseInt("770", 8).toShort)
+ private val PROFILER_FILE_PERMISSIONS = new
FsPermission(Integer.parseInt("660", 8).toShort)
private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private var outputStream: FSDataOutputStream = _
private var inputStream: InputStream = _
@@ -89,28 +100,34 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf,
executorId: String) ex
}
}
+ private def requireProfilerBaseDirAsDirectory(fs: FileSystem,
profilerDfsDir: String): Unit = {
+ if (!fs.getFileStatus(new Path(profilerDfsDir)).isDirectory) {
+ throw new IllegalArgumentException(
+ s"Profiler DFS base directory $profilerDfsDir is not a directory.")
+ }
+ }
+
private def startWriting(): Unit = {
- if (profilerDfsDir.isDefined) {
- val applicationId = try {
- conf.getAppId
- } catch {
- case _: NoSuchElementException => "local-" + System.currentTimeMillis
+ profilerDfsDirOpt.foreach { profilerDfsDir =>
+ val profilerDirForApp = s"$profilerDfsDir/$baseName"
+ val profileOutputFile = s"$profilerDirForApp/$profileFile"
+
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf)
+
+ requireProfilerBaseDirAsDirectory(fs, profilerDfsDir)
+
+ val profilerDirForAppPath = new Path(profilerDirForApp)
+ if (!fs.exists(profilerDirForAppPath)) {
+ // SPARK-30860: use the class method to avoid the umask causing
permission issues
+ FileSystem.mkdirs(fs, profilerDirForAppPath,
PROFILER_FOLDER_PERMISSIONS)
}
- val config = SparkHadoopUtil.get.newConfiguration(conf)
- val appName = conf.get("spark.app.name").replace(" ", "-")
- val profilerOutputDirname = profilerDfsDir.get
-
- val profileOutputFile =
-
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
- val fs = FileSystem.get(new URI(profileOutputFile), config);
- val filenamePath = new Path(profileOutputFile)
- outputStream = fs.create(filenamePath)
+
+ outputStream = FileSystem.create(fs, new Path(profileOutputFile),
PROFILER_FILE_PERMISSIONS)
try {
- if (fs.exists(filenamePath)) {
- fs.delete(filenamePath, true)
- }
logInfo(log"Copying executor profiling file to ${MDC(PATH,
profileOutputFile)}")
- inputStream = new BufferedInputStream(new
FileInputStream(s"$profilerLocalDir/profile.jfr"))
+ inputStream = new BufferedInputStream(
+ new FileInputStream(s"$profilerLocalDir/$profileFile"))
threadpool =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
threadpool.scheduleWithFixedDelay(
new Runnable() {
@@ -158,14 +175,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf,
executorId: String) ex
} catch {
case e: IOException => logError("Exception occurred while writing some
profiler output: ", e)
case e @ (_: IllegalArgumentException | _: IllegalStateException) =>
- logError("Some profiler output not written." +
- " Exception occurred in profiler native code: ", e)
+ logError("Some profiler output not written. " +
+ "Exception occurred in profiler native code: ", e)
case e: Exception => logError("Some profiler output not written.
Unexpected exception: ", e)
}
}
private def finishWriting(): Unit = {
- if (profilerDfsDir.isDefined && writing) {
+ if (profilerDfsDirOpt.isDefined && writing) {
try {
// shutdown background writer
threadpool.shutdown()
@@ -177,8 +194,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf,
executorId: String) ex
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case e: IOException =>
- logWarning("Some profiling output not written." +
- "Exception occurred while completing profiler output", e)
+ logWarning("Some profiling output not written. " +
+ "Exception occurred while completing profiler output: ", e)
}
writing = false
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]