This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8da592dc2f92 [SPARK-44639][SS][YARN] Use Java tmp dir for local
RocksDB state storage on Yarn
8da592dc2f92 is described below
commit 8da592dc2f92a64255b3b4335e82dd382c21873e
Author: Adam Binford <[email protected]>
AuthorDate: Sat Nov 29 12:52:27 2025 -0800
[SPARK-44639][SS][YARN] Use Java tmp dir for local RocksDB state storage on
Yarn
### What changes were proposed in this pull request?
Update the RocksDB state store to store its local data underneath
`java.io.tmpdir` instead of going through `Utils.getLocalDir` when running on
Yarn. This is done through a new util method `createExecutorLocalTempDir`, as
there may be other uses cases for this behavior as well.
### Why are the changes needed?
On YARN, the local RocksDB directory is placed in a directory created
inside the root application folder such as
```
/tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/spark-<uuid>/StateStoreId(...)
```
The problem with this is that if an executor crashes for some reason (like
OOM) and the shutdown hooks don't get run, this directory will stay around
forever until the application finishes, which can cause jobs to slowly
accumulate more and more temporary space until finally the node manager goes
unhealthy.
Because this data will only ever be accessed by the executor that created
this directory, it would make sense to store the data inside the container
folder, which will always get cleaned up by the node manager when that yarn
container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this
directory, such as
```
/tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/<container_id>/tmp/StateStoreId(...)
```
It looks like only Yarn setts the tmpdir property, and other resource
managers (standalone and k8s) always rely on the local dirs setting/env vars.
### Does this PR introduce _any_ user-facing change?
Shouldn't be any effective changes, other than preventing disk space from
filling up on Node Managers under certain scenarios.
### How was this patch tested?
New UT
Closes #42301 from Kimahriman/rocksdb-tmp-dir.
Authored-by: Adam Binford <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 9e9358a309d2ea2236beb4bdb67091fa5c382b28)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/util/Utils.scala | 16 +++++++++++++
.../sql/execution/streaming/state/RocksDB.scala | 2 +-
.../state/RocksDBStateStoreProvider.scala | 6 +++--
.../execution/streaming/state/RocksDBSuite.scala | 26 +++++++++++++++++++++-
4 files changed, 46 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 721719f2a976..0907d6d049bf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -250,6 +250,22 @@ private[spark] object Utils
dir
}
+ /**
+ * Create a temporary directy that will always be cleaned up when the
executor stops,
+ * even in the case of a hard shutdown when the shutdown hooks don't get run.
+ *
+ * Currently this only provides special behavior on YARN, where the local
dirs are not
+ * guaranteed to be cleaned up on executors hard shutdown.
+ */
+ def createExecutorLocalTempDir(conf: SparkConf, namePrefix: String): File = {
+ if (Utils.isRunningInYarnContainer(conf)) {
+ // Just use the default Java tmp dir which is set to inside the
container directory on YARN
+ createTempDir(namePrefix = namePrefix)
+ } else {
+ createTempDir(getLocalDir(conf), namePrefix)
+ }
+ }
+
/**
* Copy the first `maxSize` bytes of data from the InputStream to an
in-memory
* buffer, primarily to check for corruption.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index aa02d708933d..b12cdb9bba95 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -69,7 +69,7 @@ case object StoreTaskCompletionListener extends
RocksDBOpType("store_task_comple
class RocksDB(
dfsRootDir: String,
val conf: RocksDBConf,
- localRootDir: File = Utils.createTempDir(),
+ val localRootDir: File = Utils.createTempDir(),
hadoopConf: Configuration = new Configuration,
loggingId: String = "",
useColumnFamilies: Boolean = false,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 2cc4c8a870ae..1058c02c9304 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -837,6 +837,9 @@ private[sql] class RocksDBStateStoreProvider
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
@volatile private var rocksDBEventForwarder: Option[RocksDBEventForwarder] =
_
@volatile private var stateStoreProviderId: StateStoreProviderId = _
+ // Exposed for testing
+ @volatile private[sql] var sparkConf: SparkConf =
Option(SparkEnv.get).map(_.conf)
+ .getOrElse(new SparkConf)
protected def createRocksDB(
dfsRootDir: String,
@@ -867,8 +870,7 @@ private[sql] class RocksDBStateStoreProvider
val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
val loggingId = stateStoreProviderId.toString
- val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
- val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf),
storeIdStr)
+ val localRootDir = Utils.createExecutorLocalTempDir(sparkConf, storeIdStr)
createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir,
hadoopConf, loggingId,
useColumnFamilies, storeConf.enableStateStoreCheckpointIds,
stateStoreId.partitionId,
rocksDBEventForwarder,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index da6c3e62798e..6c22436c29a0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager}
import
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager.{CancellableFSDataOutputStream,
RenameBasedFSDataOutputStream}
+import org.apache.spark.sql.execution.streaming.runtime.StreamExecution
import org.apache.spark.sql.internal.SQLConf
import
org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -51,7 +52,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkConfWithEnv, ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._
class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf:
Configuration)
@@ -3791,6 +3792,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}}
}
+ test("SPARK-44639: Use Java tmp dir instead of configured local dirs on
Yarn") {
+ val conf = new Configuration()
+ conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+ val provider = new RocksDBStateStoreProvider()
+ provider.sparkConf = new SparkConfWithEnv(Map("CONTAINER_ID" -> "1"))
+ provider.init(
+ StateStoreId(
+ "/checkpoint",
+ 0,
+ 0
+ ),
+ new StructType(),
+ new StructType(),
+ NoPrefixKeyStateEncoderSpec(new StructType()),
+ false,
+ new StateStoreConf(sqlConf),
+ conf
+ )
+
+ assert(provider.rocksDB.localRootDir.getParent() ==
System.getProperty("java.io.tmpdir"))
+ }
+
private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone()))
class RocksDBCheckpointFormatV2(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]