This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fb5cf5f90c6 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and
automatically cleanup when `SparkContext.stop()`
fb5cf5f90c6 is described below
commit fb5cf5f90c6fe0860c811c0f7e06b9d8255d1772
Author: yangjie01 <[email protected]>
AuthorDate: Wed Jan 4 20:32:43 2023 -0800
[SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically
cleanup when `SparkContext.stop()`
### What changes were proposed in this pull request?
This pr brings two fixes:
- Add sub-dir with `spark-ui` prefix under `spark.ui.store.path` for each
Spark App to ensure that multiple Spark Apps can run normally use the same
Spark Client with same `spark.ui.store.path` configuration
- Automatically cleanup Live UI data when `SparkContext.stop()`
### Why are the changes needed?
There are 2 issue before this pr:
1. Multiple Spark Apps can't run normally use the same Spark Client with
same `spark.ui.store.path` configuration, the following exceptions will occur:
```
org.rocksdb.RocksDBException: While lock file:
/${baseDir}/listing.rdb/LOCK: Resource temporarily unavailable
```
At the same time, only one Spark App can run normally use RocksDB as the
Live UI store.
After this pr, each Spark App uses an independent RocksDB directory when
`spark.ui.store.path` is specified as Live UI store.
2. `spark.ui.store.path` directory not clean up when `SparkContext.stop()`:
- The disk space occupied by the `spark.ui.store.path` directory will
continue to grow.
- When submitting new App and reusing the `spark.ui.store.path`
directory, we will see the content related to the previous App, which is a bit
strange
After this pr, `spark.ui.store.path` directory is is automatically cleaned
by default when `SparkContext` stop.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add new UTs
Closes #39226 from LuciferYang/SPARK-41694.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../org/apache/spark/status/AppStatusStore.scala | 29 +++++++++--
.../spark/status/AutoCleanupLiveUIDirSuite.scala | 56 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 70fcbfd2d51..6db2fa57833 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -18,12 +18,14 @@
package org.apache.spark.status
import java.io.File
+import java.io.IOException
import java.util.{List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
@@ -36,7 +38,8 @@ import org.apache.spark.util.kvstore.KVStore
*/
private[spark] class AppStatusStore(
val store: KVStore,
- val listener: Option[AppStatusListener] = None) {
+ val listener: Option[AppStatusListener] = None,
+ val storePath: Option[File] = None) {
def applicationInfo(): v1.ApplicationInfo = {
try {
@@ -733,6 +736,11 @@ private[spark] class AppStatusStore(
def close(): Unit = {
store.close()
+ cleanUpStorePath()
+ }
+
+ private def cleanUpStorePath(): Unit = {
+ storePath.foreach(Utils.deleteRecursively)
}
def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]):
Seq[v1.TaskData] = {
@@ -761,7 +769,7 @@ private[spark] class AppStatusStore(
}
}
-private[spark] object AppStatusStore {
+private[spark] object AppStatusStore extends Logging {
val CURRENT_VERSION = 2L
@@ -771,10 +779,23 @@ private[spark] object AppStatusStore {
def createLiveStore(
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
- val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
+
+ def createStorePath(rootDir: String): Option[File] = {
+ try {
+ val localDir = Utils.createDirectory(rootDir, "spark-ui")
+ logInfo(s"Created spark ui store directory at $rootDir")
+ Some(localDir)
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create spark ui store path in $rootDir.", e)
+ None
+ }
+ }
+
+ val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).flatMap(createStorePath)
val kvStore = KVUtils.createKVStore(storePath, live = true, conf)
val store = new ElementTrackingStore(kvStore, conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
- new AppStatusStore(store, listener = Some(listener))
+ new AppStatusStore(store, listener = Some(listener), storePath)
}
}
diff --git
a/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala
b/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala
new file mode 100644
index 00000000000..f717299a1ed
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class AutoCleanupLiveUIDirSuite extends SparkFunSuite {
+
+ test("SPARK-41694: Auto cleanup Spark UI store path") {
+ val baseUIDir = Utils.createTempDir()
+ try {
+ val conf = new
SparkConf().setAppName("ui-dir-cleanup").setMaster("local")
+ .set(LIVE_UI_LOCAL_STORE_DIR, baseUIDir.getCanonicalPath)
+ val sc = new SparkContext(conf)
+ sc.parallelize(0 until 100, 10)
+ .map { x => (x % 10) -> x }
+ .reduceByKey {
+ _ + _
+ }
+ .collect()
+ // `baseUIDir` should exists and not emtpy before SparkContext stop.
+ assert(baseUIDir.exists())
+ val subDirs = baseUIDir.listFiles()
+ assert(subDirs.nonEmpty)
+ val uiDirs = subDirs.filter(_.getName.startsWith("spark-ui"))
+ assert(uiDirs.length == 1)
+ assert(uiDirs.head.listFiles().nonEmpty)
+ sc.stop()
+ // base dir should exists
+ assert(baseUIDir.exists())
+ assert(!uiDirs.head.exists())
+ assert(baseUIDir.listFiles().isEmpty)
+ } finally {
+ JavaUtils.deleteRecursively(baseUIDir)
+ assert(!baseUIDir.exists())
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]