This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a1f3dcb628eb [SPARK-52509][CORE] Cleanup individual shuffles from
fallback storage on `RemoveShuffle` event
a1f3dcb628eb is described below
commit a1f3dcb628eb6848aefae261c29305f5e033c9df
Author: Enrico Minack <[email protected]>
AuthorDate: Fri Oct 24 22:07:49 2025 -0700
[SPARK-52509][CORE] Cleanup individual shuffles from fallback storage on
`RemoveShuffle` event
### What changes were proposed in this pull request?
Shuffle data of individual shuffles are deleted from the fallback storage
during regular shuffle cleanup.
### Why are the changes needed?
Currently, the shuffle data are only removed from the fallback storage on
Spark context shutdown. Long running Spark jobs accumulate shuffle data, though
this data is not used by Spark any more. Those shuffles should be cleaned up
while Spark context is running.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests and manual test via [reproduction
example](https://gist.github.com/EnricoMi/e9daa1176bce4c1211af3f3c5848112a/3140527bcbedec51ed2c571885db774c880cb941).
Run the reproduction example without the ` <<< "$scala"`. In the Spark
shell, execute this code:
```scala
import org.apache.spark.sql.SaveMode
val n = 100000000
val j = spark.sparkContext.broadcast(1000)
val x = spark.range(0, n, 1, 100).select($"id".cast("int"))
x.as[Int]
.mapPartitions { it => if (it.hasNext && it.next < n / 100 * 80)
Thread.sleep(2000); it }
.groupBy($"value" % 1000).as[Int, Int]
.flatMapSortedGroups($"value"){ case (m, it) => if (it.hasNext && it.next
== 0) Thread.sleep(10000); it }
.write.mode(SaveMode.Overwrite).csv("/tmp/spark.csv")
```
This writes some data of shuffle 0 to the fallback storage.
Invoking `System.gc()` removes that shuffle directory from the fallback
storage. Exiting the Spark shell removes the whole application directory.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51199 from EnricoMi/fallback-storage-cleanup.
Authored-by: Enrico Minack <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/SparkContext.scala | 3 +-
.../org/apache/spark/internal/config/package.scala | 20 ++++++----
.../org/apache/spark/storage/FallbackStorage.scala | 24 +++++++++---
.../spark/storage/FallbackStorageSuite.scala | 45 ++++++++++++++++++++--
4 files changed, 73 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b0ac6d96a001..898bbad26b7e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -641,7 +641,8 @@ class SparkContext(config: SparkConf) extends Logging {
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
- FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master,
_conf)
+ FallbackStorage.registerBlockManagerIfNeeded(
+ _env.blockManager.master, _conf, _hadoopConfiguration)
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set
spark.app.id.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 9a4ef1fbd7f6..9876848f654a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -604,23 +604,27 @@ package object config {
"cache block replication should be positive.")
.createWithDefaultString("30s")
+ private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
+ ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
+ .doc("If true, Spark cleans up its fallback storage data once individual
shuffles are " +
+ "freed (interval configured via spark.cleaner.periodicGC.interval),
and during " +
+ "shutting down.")
+ .version("3.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
.doc("The location for fallback storage during block manager
decommissioning. " +
"For example, `s3a://spark-storage/`. In case of empty, fallback
storage is disabled. " +
- "The storage should be managed by TTL because Spark will not clean it
up.")
+ "The storage will not be cleaned up by Spark unless " +
+ s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP.key} is true. " +
+ "Use an external clean up mechanism when false, for instance a TTL.")
.version("3.1.0")
.stringConf
.checkValue(_.endsWith(java.io.File.separator), "Path should end with
separator.")
.createOptional
- private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
- ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
- .doc("If true, Spark cleans up its fallback storage data during shutting
down.")
- .version("3.2.0")
- .booleanConf
- .createWithDefault(false)
-
private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
.doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index c086be0bfe80..19cdebd80ebf 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
import org.apache.spark.util.Utils
/**
@@ -95,7 +96,8 @@ private[storage] class FallbackStorage(conf: SparkConf)
extends Logging {
}
}
-private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends
RpcEndpointRef(conf) {
+private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf,
hadoopConf: Configuration)
+ extends RpcEndpointRef(conf) {
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
@@ -103,6 +105,11 @@ private[storage] class NoopRpcEndpointRef(conf: SparkConf)
extends RpcEndpointRe
override def name: String = "fallback"
override def send(message: Any): Unit = {}
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
= {
+ message match {
+ case RemoveShuffle(shuffleId) =>
+ FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId))
+ case _ => // no-op
+ }
Future{true.asInstanceOf[T]}
}
}
@@ -120,20 +127,25 @@ private[spark] object FallbackStorage extends Logging {
}
/** Register the fallback block manager and its RPC endpoint. */
- def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf:
SparkConf): Unit = {
+ def registerBlockManagerIfNeeded(
+ master: BlockManagerMaster,
+ conf: SparkConf,
+ hadoopConf: Configuration): Unit = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
master.registerBlockManager(
- FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new
NoopRpcEndpointRef(conf))
+ FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0,
+ new FallbackStorageRpcEndpointRef(conf, hadoopConf))
}
}
- /** Clean up the generated fallback location for this app. */
- def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = {
+ /** Clean up the generated fallback location for this app (and shuffle id if
given). */
+ def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId:
Option[Int] = None): Unit = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
conf.contains("spark.app.id")) {
- val fallbackPath =
+ val fallbackPath = shuffleId.foldLeft(
new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get,
conf.getAppId)
+ ) { case (path, shuffleId) => new Path(path, shuffleId.toString) }
val fallbackUri = fallbackPath.toUri
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
// The fallback directory for this app may not be created yet.
diff --git
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 6c51bd4ff2e2..6df8bc85b510 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually.{eventually,
interval, timeout}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
@@ -67,8 +68,10 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
val fallbackStorage = new FallbackStorage(conf)
- val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+ val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver =
false)
@@ -118,8 +121,10 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
"file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath +
"/")
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
val fallbackStorage = new FallbackStorage(conf)
- val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+ val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver =
false)
@@ -153,7 +158,7 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
assert(readResult.nioByteBuffer().array().sameElements(content))
}
- test("SPARK-34142: fallback storage API - cleanUp") {
+ test("SPARK-34142: fallback storage API - cleanUp app") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
val appId = s"test$cleanUp"
@@ -165,8 +170,38 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
val location = new File(dir, appId)
assert(location.mkdir())
assert(location.exists())
+ val shuffle = new File(location, "1")
+ assert(shuffle.mkdir())
+ assert(shuffle.exists())
FallbackStorage.cleanUp(conf, new Configuration())
assert(location.exists() != cleanUp)
+ assert(shuffle.exists() != cleanUp)
+ }
+ }
+ }
+
+ test("SPARK-34142: fallback storage API - cleanUp shuffle") {
+ withTempDir { dir =>
+ Seq(true, false).foreach { cleanUp =>
+ val appId = s"test$cleanUp"
+ val conf = new SparkConf(false)
+ .set("spark.app.id", appId)
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath
+ "/")
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp)
+
+ val location = new File(dir, appId)
+ assert(location.mkdir())
+ assert(location.exists())
+ val shuffle1 = new File(location, "1")
+ assert(shuffle1.mkdir())
+ assert(shuffle1.exists())
+ val shuffle2 = new File(location, "2")
+ assert(shuffle2.mkdir())
+ assert(shuffle2.exists())
+ FallbackStorage.cleanUp(conf, new Configuration(), Some(1))
+ assert(location.exists())
+ assert(shuffle1.exists() != cleanUp)
+ assert(shuffle2.exists())
}
}
}
@@ -177,6 +212,8 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
val ids = Set((1, 1L, 1))
val bm = mock(classOf[BlockManager])
@@ -202,7 +239,7 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
when(bm.getPeers(mc.any()))
.thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
- val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+ val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
when(bm.master).thenReturn(bmm)
val blockTransferService = mock(classOf[BlockTransferService])
when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(),
mc.any(), mc.any(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]